Compare commits

..

No commits in common. "af3745684c20064de44a50103921f6c504ebb574" and "4fbd469c332d50f923859caad2cdd74c98913469" have entirely different histories.

11 changed files with 184 additions and 402 deletions

View File

@ -11,6 +11,9 @@ from typing import (
Type,
Union,
)
from contextvars import (
Context,
)
from msgspec import (
structs,
@ -24,7 +27,6 @@ import tractor
from tractor import (
_state,
MsgTypeError,
Context,
)
from tractor.msg import (
_codec,
@ -39,7 +41,7 @@ from tractor.msg import (
from tractor.msg.types import (
_payload_msgs,
log,
PayloadMsg,
Msg,
Started,
mk_msg_spec,
)
@ -59,7 +61,7 @@ def mk_custom_codec(
uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair
# field on our `Msg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
@ -319,12 +321,12 @@ def dec_type_union(
import importlib
types: list[Type] = []
for type_name in type_names:
for mod in [
for ns in [
typing,
importlib.import_module(__name__),
]:
if type_ref := getattr(
mod,
ns,
type_name,
False,
):
@ -742,7 +744,7 @@ def chk_pld_type(
# 'Error', .pld: ErrorData
codec: MsgCodec = mk_codec(
# NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified
# NOTE: this ONLY accepts `Msg.pld` fields of a specified
# type union.
ipc_pld_spec=payload_spec,
)
@ -750,7 +752,7 @@ def chk_pld_type(
# make a one-off dec to compare with our `MsgCodec` instance
# which does the below `mk_msg_spec()` call internally
ipc_msg_spec: Union[Type[Struct]]
msg_types: list[PayloadMsg[payload_spec]]
msg_types: list[Msg[payload_spec]]
(
ipc_msg_spec,
msg_types,
@ -759,7 +761,7 @@ def chk_pld_type(
)
_enc = msgpack.Encoder()
_dec = msgpack.Decoder(
type=ipc_msg_spec or Any, # like `PayloadMsg[Any]`
type=ipc_msg_spec or Any, # like `Msg[Any]`
)
assert (
@ -804,7 +806,7 @@ def chk_pld_type(
'cid': '666',
'pld': pld,
}
enc_msg: PayloadMsg = typedef(**kwargs)
enc_msg: Msg = typedef(**kwargs)
_wire_bytes: bytes = _enc.encode(enc_msg)
wire_bytes: bytes = codec.enc.encode(enc_msg)
@ -881,16 +883,25 @@ def test_limit_msgspec():
debug_mode=True
):
# ensure we can round-trip a boxing `PayloadMsg`
# ensure we can round-trip a boxing `Msg`
assert chk_pld_type(
payload_spec=Any,
pld=None,
# Msg,
Any,
None,
expect_roundtrip=True,
)
# TODO: don't need this any more right since
# `msgspec>=0.15` has the nice generics stuff yah??
#
# manually override the type annot of the payload
# field and ensure it propagates to all msg-subtypes.
# Msg.__annotations__['pld'] = Any
# verify that a mis-typed payload value won't decode
assert not chk_pld_type(
payload_spec=int,
# Msg,
int,
pld='doggy',
)
@ -902,16 +913,18 @@ def test_limit_msgspec():
value: Any
assert not chk_pld_type(
payload_spec=CustomPayload,
# Msg,
CustomPayload,
pld='doggy',
)
assert chk_pld_type(
payload_spec=CustomPayload,
# Msg,
CustomPayload,
pld=CustomPayload(name='doggy', value='urmom')
)
# yah, we can `.pause_from_sync()` now!
# uhh bc we can `.pause_from_sync()` now! :surfer:
# breakpoint()
trio.run(main)

View File

@ -1336,23 +1336,6 @@ def test_shield_pause(
child.expect(pexpect.EOF)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead
# (in terms of `greenback` and/or extra threads) and if it's from
# a sync scope suggest that usage must first call
# `ensure_portal()` in the (eventual parent) async calling scope?
def test_sync_pause_from_bg_task_in_root_actor_():
'''
When used from the root actor, normally we can only implicitly
support `.pause_from_sync()` from the main-parent-task (that
opens the runtime via `open_root_actor()`) since `greenback`
requires a `.ensure_portal()` call per `trio.Task` where it is
used.
'''
...
# TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden():
'''

View File

@ -19,7 +19,7 @@ from tractor._testing import (
@pytest.fixture
def run_example_in_subproc(
loglevel: str,
testdir: pytest.Testdir,
testdir,
reg_addr: tuple[str, int],
):

View File

@ -49,7 +49,6 @@ from ._exceptions import (
ModuleNotExposed as ModuleNotExposed,
MsgTypeError as MsgTypeError,
RemoteActorError as RemoteActorError,
TransportClosed as TransportClosed,
)
from .devx import (
breakpoint as breakpoint,

View File

@ -906,59 +906,8 @@ class StreamOverrun(
'''
class TransportClosed(trio.BrokenResourceError):
'''
IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer
be used to send/receive msgs from the remote peer.
'''
def __init__(
self,
message: str,
loglevel: str = 'transport',
cause: BaseException|None = None,
raise_on_report: bool = False,
) -> None:
self.message: str = message
self._loglevel = loglevel
super().__init__(message)
if cause is not None:
self.__cause__ = cause
# flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
def report_n_maybe_raise(
self,
message: str|None = None,
) -> None:
'''
Using the init-specified log level emit a logging report
for this error.
'''
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.__cause__:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
message += (
f'{cause_tb_str}\n' # tb
f' {cause}\n' # exc repr
)
getattr(log, self._loglevel)(message)
# some errors we want to blow up from
# inside the RPC msg loop
if self._raise_on_report:
raise self from cause
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class NoResult(RuntimeError):

View File

@ -54,7 +54,7 @@ from tractor._exceptions import (
)
from tractor.msg import (
_ctxvar_MsgCodec,
# _codec, XXX see `self._codec` sanity/debug checks
_codec,
MsgCodec,
types as msgtypes,
pretty_struct,
@ -65,18 +65,8 @@ log = get_logger(__name__)
_is_windows = platform.system() == 'Windows'
def get_stream_addrs(
stream: trio.SocketStream
) -> tuple[
tuple[str, int], # local
tuple[str, int], # remote
]:
'''
Return the `trio` streaming transport prot's socket-addrs for
both the local and remote sides as a pair.
'''
# rn, should both be IP sockets
def get_stream_addrs(stream: trio.SocketStream) -> tuple:
# should both be IP sockets
lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername()
return (
@ -85,22 +75,17 @@ def get_stream_addrs(
)
# from tractor.msg.types import MsgType
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# vars..
MsgType = TypeVar('MsgType')
# TODO: this should be our `Union[*msgtypes.__spec__]` now right?
MsgType = TypeVar("MsgType")
# TODO: consider using a generic def and indexing with our eventual
# msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
# - https://jcristharif.com/msgspec/usage.html#structs
# TODO: break up this mod into a subpkg so we can start adding new
# backends and move this type stuff into a dedicated file.. Bo
#
@runtime_checkable
class MsgTransport(Protocol[MsgType]):
#
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
stream: trio.SocketStream
drained: list[MsgType]
@ -135,9 +120,9 @@ class MsgTransport(Protocol[MsgType]):
...
# TODO: typing oddity.. not sure why we have to inherit here, but it
# seems to be an issue with `get_msg_transport()` returning
# a `Type[Protocol]`; probably should make a `mypy` issue?
# TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue?
class MsgpackTCPStream(MsgTransport):
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
@ -160,7 +145,7 @@ class MsgpackTCPStream(MsgTransport):
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
#
# TODO: define this as a `Codec` struct which can be
# overriden dynamically by the application/runtime?
# overriden dynamically by the application/runtime.
codec: tuple[
Callable[[Any], Any]|None, # coder
Callable[[type, Any], Any]|None, # decoder
@ -175,7 +160,7 @@ class MsgpackTCPStream(MsgTransport):
self._laddr, self._raddr = get_stream_addrs(stream)
# create read loop instance
self._aiter_pkts = self._iter_packets()
self._agen = self._iter_packets()
self._send_lock = trio.StrictFIFOLock()
# public i guess?
@ -189,12 +174,15 @@ class MsgpackTCPStream(MsgTransport):
# allow for custom IPC msg interchange format
# dynamic override Bo
self._task = trio.lowlevel.current_task()
# XXX for ctxvar debug only!
# self._codec: MsgCodec = (
# codec
# or
# _codec._ctxvar_MsgCodec.get()
self._codec: MsgCodec = (
codec
or
_codec._ctxvar_MsgCodec.get()
)
# TODO: mask out before release?
# log.runtime(
# f'New {self} created with codec\n'
# f'codec: {self._codec}\n'
# )
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
@ -202,11 +190,6 @@ class MsgpackTCPStream(MsgTransport):
Yield `bytes`-blob decoded packets from the underlying TCP
stream using the current task's `MsgCodec`.
This is a streaming routine implemented as an async generator
func (which was the original design, but could be changed?)
and is allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_pkts` attr.
'''
decodes_failed: int = 0
@ -221,82 +204,16 @@ class MsgpackTCPStream(MsgTransport):
# seem to be getting racy failures here on
# arbiter/registry name subs..
trio.BrokenResourceError,
) as trans_err:
loglevel = 'transport'
match trans_err:
# case (
# ConnectionResetError()
# ):
# loglevel = 'transport'
# peer actor (graceful??) TCP EOF but `tricycle`
# seems to raise a 0-bytes-read?
case ValueError() if (
'unclean EOF' in trans_err.args[0]
):
pass
# peer actor (task) prolly shutdown quickly due
# to cancellation
case trio.BrokenResourceError() if (
'Connection reset by peer' in trans_err.args[0]
):
pass
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
loglevel: str = 'warning'
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
loglevel=loglevel,
) from trans_err
# XXX definitely can happen if transport is closed
# manually by another `trio.lowlevel.Task` in the
# same actor; we use this in some simulated fault
# testing for ex, but generally should never happen
# under normal operation!
#
# NOTE: as such we always re-raise this error from the
# RPC msg loop!
except trio.ClosedResourceError as closure_err:
raise TransportClosed(
message=(
f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
),
loglevel='error',
raise_on_report=(
closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
),
) from closure_err
# graceful TCP EOF disconnect
if header == b'':
raise TransportClosed(
message=(
f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
),
loglevel='transport',
# cause=??? # handy or no?
f'transport {self} was already closed prior ro read'
)
if header == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
size: int
size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore
@ -308,20 +225,33 @@ class MsgpackTCPStream(MsgTransport):
# the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get()
# XXX for ctxvar debug only!
# if self._codec.pld_spec != codec.pld_spec:
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec:
# assert (
# task := trio.lowlevel.current_task()
# ) is not self._task
# self._task = task
# self._codec = codec
# log.runtime(
# f'Using new codec in {self}.recv()\n'
# f'codec: {self._codec}\n\n'
# f'msg_bytes: {msg_bytes}\n'
# )
self._codec = codec
log.runtime(
f'Using new codec in {self}.recv()\n'
f'codec: {self._codec}\n\n'
f'msg_bytes: {msg_bytes}\n'
)
yield codec.decode(msg_bytes)
# TODO: remove, was only for orig draft impl
# testing.
#
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
# obj = curr_codec.decode(msg_bytes)
# if (
# curr_codec is not
# _codec._def_msgspec_codec
# ):
# print(f'OBJ: {obj}\n')
#
# yield obj
# XXX NOTE: since the below error derives from
# `DecodeError` we need to catch is specially
# and always raise such that spec violations
@ -365,8 +295,7 @@ class MsgpackTCPStream(MsgTransport):
msg: msgtypes.MsgType,
strict_types: bool = True,
hide_tb: bool = False,
# hide_tb: bool = False,
) -> None:
'''
Send a msgpack encoded py-object-blob-as-msg over TCP.
@ -375,24 +304,21 @@ class MsgpackTCPStream(MsgTransport):
invalid msg type
'''
__tracebackhide__: bool = hide_tb
# XXX see `trio._sync.AsyncContextManagerMixin` for details
# on the `.acquire()`/`.release()` sequencing..
# __tracebackhide__: bool = hide_tb
async with self._send_lock:
# NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get()
# XXX for ctxvar debug only!
# if self._codec.pld_spec != codec.pld_spec:
# self._codec = codec
# log.runtime(
# f'Using new codec in {self}.send()\n'
# f'codec: {self._codec}\n\n'
# f'msg: {msg}\n'
# )
# TODO: mask out before release?
if self._codec.pld_spec != codec.pld_spec:
self._codec = codec
log.runtime(
f'Using new codec in {self}.send()\n'
f'codec: {self._codec}\n\n'
f'msg: {msg}\n'
)
if type(msg) not in msgtypes.__msg_types__:
if strict_types:
@ -426,16 +352,6 @@ class MsgpackTCPStream(MsgTransport):
size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data)
# ?TODO? does it help ever to dynamically show this
# frame?
# try:
# <the-above_code>
# except BaseException as _err:
# err = _err
# if not isinstance(err, MsgTypeError):
# __tracebackhide__: bool = False
# raise
@property
def laddr(self) -> tuple[str, int]:
return self._laddr
@ -445,7 +361,7 @@ class MsgpackTCPStream(MsgTransport):
return self._raddr
async def recv(self) -> Any:
return await self._aiter_pkts.asend(None)
return await self._agen.asend(None)
async def drain(self) -> AsyncIterator[dict]:
'''
@ -462,7 +378,7 @@ class MsgpackTCPStream(MsgTransport):
yield msg
def __aiter__(self):
return self._aiter_pkts
return self._agen
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@ -517,7 +433,7 @@ class Channel:
# set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None
self._aiter_msgs = self._iter_msgs()
self._agen = self._aiter_recv()
self._exc: Exception|None = None # set if far end actor errors
self._closed: bool = False
@ -581,6 +497,8 @@ class Channel:
)
return self._transport
# TODO: something simliar at the IPC-`Context`
# level so as to support
@cm
def apply_codec(
self,
@ -599,7 +517,6 @@ class Channel:
finally:
self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs?
def __repr__(self) -> str:
if not self._transport:
return '<Channel with inactive transport?>'
@ -643,43 +560,27 @@ class Channel:
)
return transport
# TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn..
# seems like a pretty useful thing to have in general
# along with being able to filter certain stack frame(s / sets)
# possibly based on the current log-level?
async def send(
self,
payload: Any,
hide_tb: bool = False,
# hide_tb: bool = False,
) -> None:
'''
Send a coded msg-blob over the transport.
'''
__tracebackhide__: bool = hide_tb
try:
# __tracebackhide__: bool = hide_tb
log.transport(
'=> send IPC msg:\n\n'
f'{pformat(payload)}\n'
)
# assert self._transport # but why typing?
) # type: ignore
assert self._transport
await self._transport.send(
payload,
hide_tb=hide_tb,
# hide_tb=hide_tb,
)
except BaseException as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
assert err.cid
raise
async def recv(self) -> Any:
assert self._transport
@ -716,11 +617,8 @@ class Channel:
await self.aclose(*args)
def __aiter__(self):
return self._aiter_msgs
return self._agen
# ?TODO? run any reconnection sequence?
# -[ ] prolly should be impl-ed as deco-API?
#
# async def _reconnect(self) -> None:
# """Handle connection failures by polling until a reconnect can be
# established.
@ -738,6 +636,7 @@ class Channel:
# else:
# log.transport("Stream connection re-established!")
# # TODO: run any reconnection sequence
# # on_recon = self._recon_seq
# # if on_recon:
# # await on_recon(self)
@ -751,17 +650,11 @@ class Channel:
# " for re-establishment")
# await trio.sleep(1)
async def _iter_msgs(
async def _aiter_recv(
self
) -> AsyncGenerator[Any, None]:
'''
Yield `MsgType` IPC msgs decoded and deliverd from
an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
Async iterate items from underlying stream.
'''
assert self._transport
@ -787,6 +680,15 @@ class Channel:
case _:
yield msg
# TODO: if we were gonna do this it should be
# done up at the `MsgStream` layer!
#
# sent = yield item
# if sent is not None:
# # optimization, passing None through all the
# # time is pointless
# await self._transport.send(sent)
except trio.BrokenResourceError:
# if not self._autorecon:

View File

@ -68,7 +68,7 @@ from .msg import (
MsgCodec,
PayloadT,
NamespacePath,
# pretty_struct,
pretty_struct,
_ops as msgops,
)
from tractor.msg.types import (
@ -89,16 +89,6 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
# func-type-cases implemented "on top of" `@context` defs:
# -[ ] std async func helper decorated with `@rpc_func`?
# -[ ] `Portal.open_stream_from()` with async-gens?
# |_ possibly a duplex form of this with a
# `sent_from_peer = yield send_to_peer` form, which would require
# syncing the send/recv side with possibly `.receive_nowait()`
# on each `yield`?
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
# user only defining a single-`yield` generator-func?
async def _invoke_non_context(
actor: Actor,
cancel_scope: CancelScope,
@ -118,9 +108,8 @@ async def _invoke_non_context(
] = trio.TASK_STATUS_IGNORED,
):
__tracebackhide__: bool = True
cs: CancelScope|None = None # ref when activated
# ?TODO? can we unify this with the `context=True` impl below?
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send(
StartAck(
@ -171,6 +160,10 @@ async def _invoke_non_context(
functype='asyncgen',
)
)
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
@ -182,13 +175,15 @@ async def _invoke_non_context(
await chan.send(
Stop(cid=cid)
)
# simplest function/method request-response pattern
# XXX: in the most minimally used case, just a scheduled internal runtime
# call to `Actor._cancel_task()` from the ctx-peer task since we
# don't (yet) have a dedicated IPC msg.
# ------ - ------
else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
failed_resp: bool = False
try:
ack = StartAck(
@ -359,15 +354,8 @@ async def _errors_relayed_via_ipc(
# channel.
task_status.started(err)
# always propagate KBIs at the sys-process level.
if (
isinstance(err, KeyboardInterrupt)
# ?TODO? except when running in asyncio mode?
# |_ wut if you want to open a `@context` FROM an
# infected_aio task?
# and not actor.is_infected_aio()
):
# always reraise KBIs so they propagate at the sys-process level.
if isinstance(err, KeyboardInterrupt):
raise
# RPC task bookeeping.
@ -470,6 +458,7 @@ async def _invoke(
# tb: TracebackType = None
cancel_scope = CancelScope()
cs: CancelScope|None = None # ref when activated
ctx = actor.get_context(
chan=chan,
cid=cid,
@ -618,8 +607,6 @@ async def _invoke(
# `@context` marked RPC function.
# - `._portal` is never set.
try:
tn: trio.Nursery
rpc_ctx_cs: CancelScope
async with (
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
@ -629,7 +616,7 @@ async def _invoke(
),
):
ctx._scope_nursery = tn
rpc_ctx_cs = ctx._scope = tn.cancel_scope
ctx._scope = tn.cancel_scope
task_status.started(ctx)
# TODO: better `trionics` tooling:
@ -655,7 +642,7 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser.
if rpc_ctx_cs.cancelled_caught:
if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid
# first check for and raise any remote error
@ -665,7 +652,9 @@ async def _invoke(
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
if rpc_ctx_cs.cancel_called:
cs: CancelScope = ctx._scope
if cs.cancel_called:
canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -691,14 +680,8 @@ async def _invoke(
elif canceller == ctx.chan.uid:
explain += f'its {ctx.peer_side!r}-side peer'
elif canceller == our_uid:
explain += 'itself'
elif canceller:
explain += 'a remote peer'
else:
explain += 'an unknown cause?'
explain += 'a remote peer'
explain += (
add_div(message=explain)
@ -928,10 +911,7 @@ async def process_messages(
f'IPC msg from peer\n'
f'<= {chan.uid}\n\n'
# TODO: use of the pprinting of structs is
# FRAGILE and should prolly not be
#
# avoid fmting depending on loglevel for perf?
# TODO: avoid fmting depending on loglevel for perf?
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead!
@ -1197,7 +1177,7 @@ async def process_messages(
parent_chan=chan,
)
except TransportClosed as tc:
except TransportClosed:
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
@ -1205,20 +1185,12 @@ async def process_messages(
# up..
#
# TODO: maybe add a teardown handshake? and,
# -[x] don't show this msg if it's an ephemeral discovery ep call?
# |_ see the below `.report_n_maybe_raise()` impl as well as
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
# for different read-failure cases.
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[ ] figure out how this will break with other transports?
tc.report_n_maybe_raise(
message=(
f'peer IPC channel closed abruptly?\n\n'
f'<=x {chan}\n'
f' |_{chan.raddr}\n\n'
)
+
tc.message
log.runtime(
f'IPC channel closed abruptly\n'
f'<=x peer: {chan.uid}\n'
f' |_{chan.raddr}\n'
)
# transport **WAS** disconnected
@ -1266,7 +1238,7 @@ async def process_messages(
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
# f'{pretty_struct.pformat(msg)}'
f'{pretty_struct.pformat(msg)}'
)
log.runtime(message)

View File

@ -54,12 +54,11 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
# FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check?
CUSTOM_LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'DEVX': 17,
'CANCEL': 22,
'CANCEL': 18,
'PDB': 500,
}
STD_PALETTE = {
@ -148,8 +147,6 @@ class StackLevelAdapter(LoggerAdapter):
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
NOTE: all custom level methods (above) delegate to this!
'''
if self.isEnabledFor(level):
stacklevel: int = 3

View File

@ -34,9 +34,6 @@ from pprint import (
saferepr,
)
from tractor.log import get_logger
log = get_logger()
# TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks?
# taken and modified from:
@ -146,13 +143,7 @@ def pformat(
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
try:
val_str: str = saferepr(v)
except Exception:
log.exception(
'Failed to `saferepr({type(struct)})` !?\n'
)
return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -203,20 +194,12 @@ class Struct(
return sin_props
pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
def __repr__(self) -> str:
try:
return pformat(self)
except Exception:
log.exception(
f'Failed to `pformat({type(self)})` !?\n'
)
return _Struct.__repr__(self)
__repr__ = pformat
def copy(
self,

View File

@ -156,12 +156,11 @@ class BroadcastState(Struct):
class BroadcastReceiver(ReceiveChannel):
'''
A memory receive channel broadcaster which is non-lossy for
the fastest consumer.
A memory receive channel broadcaster which is non-lossy for the
fastest consumer.
Additional consumer tasks can receive all produced values by
registering with ``.subscribe()`` and receiving from the new
instance it delivers.
Additional consumer tasks can receive all produced values by registering
with ``.subscribe()`` and receiving from the new instance it delivers.
'''
def __init__(

View File

@ -18,12 +18,8 @@
Async context manager primitives with hard ``trio``-aware semantics
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from contextlib import asynccontextmanager as acm
import inspect
from types import ModuleType
from typing import (
Any,
AsyncContextManager,
@ -34,16 +30,13 @@ from typing import (
Optional,
Sequence,
TypeVar,
TYPE_CHECKING,
)
import trio
from tractor._state import current_actor
from tractor.log import get_logger
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__)
@ -53,10 +46,8 @@ T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None,
nursery: trio.Nursery | None = None,
shield: bool = False,
lib: ModuleType = trio,
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
@ -67,12 +58,13 @@ async def maybe_open_nursery(
if nursery is not None:
yield nursery
else:
async with lib.open_nursery() as nursery:
async with trio.open_nursery() as nursery:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
@ -99,6 +91,7 @@ async def _enter_and_wait(
@acm
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[
@ -109,17 +102,15 @@ async def gather_contexts(
None,
]:
'''
Concurrently enter a sequence of async context managers (acms),
each from a separate `trio` task and deliver the unwrapped
`yield`-ed values in the same order once all managers have entered.
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
On exit, all acms are subsequently and concurrently exited.
This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R).
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited, and cancellation just works.
'''
seed: int = id(mngrs)
@ -219,10 +210,9 @@ async def maybe_open_context(
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open an async-context-manager (acm) if there is not already
a `_Cached` version for the provided (input) `key` for *this* actor.
Return the `_Cached` instance on a _Cache hit.
Maybe open a context manager if there is not already a _Cached
version for the provided ``key`` for *this* actor. Return the
_Cached instance on a _Cache hit.
'''
fid = id(acm_func)
@ -283,13 +273,8 @@ async def maybe_open_context(
else:
_Cache.users += 1
log.runtime(
f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n'
# TODO: make this work with values but without
# `msgspec.Struct` causing frickin crashes on field-type
# lookups..
# f'{ctx_key!r} -> {yielded!r}\n'
f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
f'{ctx_key!r} -> {yielded!r}\n'
)
lock.release()
yield True, yielded