Compare commits

..

No commits in common. "af3745684c20064de44a50103921f6c504ebb574" and "4fbd469c332d50f923859caad2cdd74c98913469" have entirely different histories.

11 changed files with 184 additions and 402 deletions

View File

@ -11,6 +11,9 @@ from typing import (
Type, Type,
Union, Union,
) )
from contextvars import (
Context,
)
from msgspec import ( from msgspec import (
structs, structs,
@ -24,7 +27,6 @@ import tractor
from tractor import ( from tractor import (
_state, _state,
MsgTypeError, MsgTypeError,
Context,
) )
from tractor.msg import ( from tractor.msg import (
_codec, _codec,
@ -39,7 +41,7 @@ from tractor.msg import (
from tractor.msg.types import ( from tractor.msg.types import (
_payload_msgs, _payload_msgs,
log, log,
PayloadMsg, Msg,
Started, Started,
mk_msg_spec, mk_msg_spec,
) )
@ -59,7 +61,7 @@ def mk_custom_codec(
uid: tuple[str, str] = tractor.current_actor().uid uid: tuple[str, str] = tractor.current_actor().uid
# XXX NOTE XXX: despite defining `NamespacePath` as a type # XXX NOTE XXX: despite defining `NamespacePath` as a type
# field on our `PayloadMsg.pld`, we still need a enc/dec_hook() pair # field on our `Msg.pld`, we still need a enc/dec_hook() pair
# to cast to/from that type on the wire. See the docs: # to cast to/from that type on the wire. See the docs:
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
@ -319,12 +321,12 @@ def dec_type_union(
import importlib import importlib
types: list[Type] = [] types: list[Type] = []
for type_name in type_names: for type_name in type_names:
for mod in [ for ns in [
typing, typing,
importlib.import_module(__name__), importlib.import_module(__name__),
]: ]:
if type_ref := getattr( if type_ref := getattr(
mod, ns,
type_name, type_name,
False, False,
): ):
@ -742,7 +744,7 @@ def chk_pld_type(
# 'Error', .pld: ErrorData # 'Error', .pld: ErrorData
codec: MsgCodec = mk_codec( codec: MsgCodec = mk_codec(
# NOTE: this ONLY accepts `PayloadMsg.pld` fields of a specified # NOTE: this ONLY accepts `Msg.pld` fields of a specified
# type union. # type union.
ipc_pld_spec=payload_spec, ipc_pld_spec=payload_spec,
) )
@ -750,7 +752,7 @@ def chk_pld_type(
# make a one-off dec to compare with our `MsgCodec` instance # make a one-off dec to compare with our `MsgCodec` instance
# which does the below `mk_msg_spec()` call internally # which does the below `mk_msg_spec()` call internally
ipc_msg_spec: Union[Type[Struct]] ipc_msg_spec: Union[Type[Struct]]
msg_types: list[PayloadMsg[payload_spec]] msg_types: list[Msg[payload_spec]]
( (
ipc_msg_spec, ipc_msg_spec,
msg_types, msg_types,
@ -759,7 +761,7 @@ def chk_pld_type(
) )
_enc = msgpack.Encoder() _enc = msgpack.Encoder()
_dec = msgpack.Decoder( _dec = msgpack.Decoder(
type=ipc_msg_spec or Any, # like `PayloadMsg[Any]` type=ipc_msg_spec or Any, # like `Msg[Any]`
) )
assert ( assert (
@ -804,7 +806,7 @@ def chk_pld_type(
'cid': '666', 'cid': '666',
'pld': pld, 'pld': pld,
} }
enc_msg: PayloadMsg = typedef(**kwargs) enc_msg: Msg = typedef(**kwargs)
_wire_bytes: bytes = _enc.encode(enc_msg) _wire_bytes: bytes = _enc.encode(enc_msg)
wire_bytes: bytes = codec.enc.encode(enc_msg) wire_bytes: bytes = codec.enc.encode(enc_msg)
@ -881,16 +883,25 @@ def test_limit_msgspec():
debug_mode=True debug_mode=True
): ):
# ensure we can round-trip a boxing `PayloadMsg` # ensure we can round-trip a boxing `Msg`
assert chk_pld_type( assert chk_pld_type(
payload_spec=Any, # Msg,
pld=None, Any,
None,
expect_roundtrip=True, expect_roundtrip=True,
) )
# TODO: don't need this any more right since
# `msgspec>=0.15` has the nice generics stuff yah??
#
# manually override the type annot of the payload
# field and ensure it propagates to all msg-subtypes.
# Msg.__annotations__['pld'] = Any
# verify that a mis-typed payload value won't decode # verify that a mis-typed payload value won't decode
assert not chk_pld_type( assert not chk_pld_type(
payload_spec=int, # Msg,
int,
pld='doggy', pld='doggy',
) )
@ -902,16 +913,18 @@ def test_limit_msgspec():
value: Any value: Any
assert not chk_pld_type( assert not chk_pld_type(
payload_spec=CustomPayload, # Msg,
CustomPayload,
pld='doggy', pld='doggy',
) )
assert chk_pld_type( assert chk_pld_type(
payload_spec=CustomPayload, # Msg,
CustomPayload,
pld=CustomPayload(name='doggy', value='urmom') pld=CustomPayload(name='doggy', value='urmom')
) )
# yah, we can `.pause_from_sync()` now! # uhh bc we can `.pause_from_sync()` now! :surfer:
# breakpoint() # breakpoint()
trio.run(main) trio.run(main)

View File

@ -1336,23 +1336,6 @@ def test_shield_pause(
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead
# (in terms of `greenback` and/or extra threads) and if it's from
# a sync scope suggest that usage must first call
# `ensure_portal()` in the (eventual parent) async calling scope?
def test_sync_pause_from_bg_task_in_root_actor_():
'''
When used from the root actor, normally we can only implicitly
support `.pause_from_sync()` from the main-parent-task (that
opens the runtime via `open_root_actor()`) since `greenback`
requires a `.ensure_portal()` call per `trio.Task` where it is
used.
'''
...
# TODO: needs ANSI code stripping tho, see `assert_before()` # above! # TODO: needs ANSI code stripping tho, see `assert_before()` # above!
def test_correct_frames_below_hidden(): def test_correct_frames_below_hidden():
''' '''

View File

@ -19,7 +19,7 @@ from tractor._testing import (
@pytest.fixture @pytest.fixture
def run_example_in_subproc( def run_example_in_subproc(
loglevel: str, loglevel: str,
testdir: pytest.Testdir, testdir,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
): ):

View File

@ -49,7 +49,6 @@ from ._exceptions import (
ModuleNotExposed as ModuleNotExposed, ModuleNotExposed as ModuleNotExposed,
MsgTypeError as MsgTypeError, MsgTypeError as MsgTypeError,
RemoteActorError as RemoteActorError, RemoteActorError as RemoteActorError,
TransportClosed as TransportClosed,
) )
from .devx import ( from .devx import (
breakpoint as breakpoint, breakpoint as breakpoint,

View File

@ -906,59 +906,8 @@ class StreamOverrun(
''' '''
class TransportClosed(trio.BrokenResourceError): class TransportClosed(trio.ClosedResourceError):
''' "Underlying channel transport was closed prior to use"
IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer
be used to send/receive msgs from the remote peer.
'''
def __init__(
self,
message: str,
loglevel: str = 'transport',
cause: BaseException|None = None,
raise_on_report: bool = False,
) -> None:
self.message: str = message
self._loglevel = loglevel
super().__init__(message)
if cause is not None:
self.__cause__ = cause
# flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
def report_n_maybe_raise(
self,
message: str|None = None,
) -> None:
'''
Using the init-specified log level emit a logging report
for this error.
'''
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.__cause__:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
message += (
f'{cause_tb_str}\n' # tb
f' {cause}\n' # exc repr
)
getattr(log, self._loglevel)(message)
# some errors we want to blow up from
# inside the RPC msg loop
if self._raise_on_report:
raise self from cause
class NoResult(RuntimeError): class NoResult(RuntimeError):

View File

@ -54,7 +54,7 @@ from tractor._exceptions import (
) )
from tractor.msg import ( from tractor.msg import (
_ctxvar_MsgCodec, _ctxvar_MsgCodec,
# _codec, XXX see `self._codec` sanity/debug checks _codec,
MsgCodec, MsgCodec,
types as msgtypes, types as msgtypes,
pretty_struct, pretty_struct,
@ -65,18 +65,8 @@ log = get_logger(__name__)
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
def get_stream_addrs( def get_stream_addrs(stream: trio.SocketStream) -> tuple:
stream: trio.SocketStream # should both be IP sockets
) -> tuple[
tuple[str, int], # local
tuple[str, int], # remote
]:
'''
Return the `trio` streaming transport prot's socket-addrs for
both the local and remote sides as a pair.
'''
# rn, should both be IP sockets
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
return ( return (
@ -85,22 +75,17 @@ def get_stream_addrs(
) )
# from tractor.msg.types import MsgType # TODO: this should be our `Union[*msgtypes.__spec__]` now right?
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? MsgType = TypeVar("MsgType")
# => BLEH, except can't bc prots must inherit typevar or param-spec
# vars.. # TODO: consider using a generic def and indexing with our eventual
MsgType = TypeVar('MsgType') # msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
# - https://jcristharif.com/msgspec/usage.html#structs
# TODO: break up this mod into a subpkg so we can start adding new
# backends and move this type stuff into a dedicated file.. Bo
#
@runtime_checkable @runtime_checkable
class MsgTransport(Protocol[MsgType]): class MsgTransport(Protocol[MsgType]):
#
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
stream: trio.SocketStream stream: trio.SocketStream
drained: list[MsgType] drained: list[MsgType]
@ -135,9 +120,9 @@ class MsgTransport(Protocol[MsgType]):
... ...
# TODO: typing oddity.. not sure why we have to inherit here, but it # TODO: not sure why we have to inherit here, but it seems to be an
# seems to be an issue with `get_msg_transport()` returning # issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# a `Type[Protocol]`; probably should make a `mypy` issue? # probably should make a `mypy` issue?
class MsgpackTCPStream(MsgTransport): class MsgpackTCPStream(MsgTransport):
''' '''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data A ``trio.SocketStream`` delivering ``msgpack`` formatted data
@ -160,7 +145,7 @@ class MsgpackTCPStream(MsgTransport):
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
# #
# TODO: define this as a `Codec` struct which can be # TODO: define this as a `Codec` struct which can be
# overriden dynamically by the application/runtime? # overriden dynamically by the application/runtime.
codec: tuple[ codec: tuple[
Callable[[Any], Any]|None, # coder Callable[[Any], Any]|None, # coder
Callable[[type, Any], Any]|None, # decoder Callable[[type, Any], Any]|None, # decoder
@ -175,7 +160,7 @@ class MsgpackTCPStream(MsgTransport):
self._laddr, self._raddr = get_stream_addrs(stream) self._laddr, self._raddr = get_stream_addrs(stream)
# create read loop instance # create read loop instance
self._aiter_pkts = self._iter_packets() self._agen = self._iter_packets()
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
# public i guess? # public i guess?
@ -189,12 +174,15 @@ class MsgpackTCPStream(MsgTransport):
# allow for custom IPC msg interchange format # allow for custom IPC msg interchange format
# dynamic override Bo # dynamic override Bo
self._task = trio.lowlevel.current_task() self._task = trio.lowlevel.current_task()
self._codec: MsgCodec = (
# XXX for ctxvar debug only! codec
# self._codec: MsgCodec = ( or
# codec _codec._ctxvar_MsgCodec.get()
# or )
# _codec._ctxvar_MsgCodec.get() # TODO: mask out before release?
# log.runtime(
# f'New {self} created with codec\n'
# f'codec: {self._codec}\n'
# ) # )
async def _iter_packets(self) -> AsyncGenerator[dict, None]: async def _iter_packets(self) -> AsyncGenerator[dict, None]:
@ -202,11 +190,6 @@ class MsgpackTCPStream(MsgTransport):
Yield `bytes`-blob decoded packets from the underlying TCP Yield `bytes`-blob decoded packets from the underlying TCP
stream using the current task's `MsgCodec`. stream using the current task's `MsgCodec`.
This is a streaming routine implemented as an async generator
func (which was the original design, but could be changed?)
and is allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_pkts` attr.
''' '''
decodes_failed: int = 0 decodes_failed: int = 0
@ -221,82 +204,16 @@ class MsgpackTCPStream(MsgTransport):
# seem to be getting racy failures here on # seem to be getting racy failures here on
# arbiter/registry name subs.. # arbiter/registry name subs..
trio.BrokenResourceError, trio.BrokenResourceError,
) as trans_err:
loglevel = 'transport'
match trans_err:
# case (
# ConnectionResetError()
# ):
# loglevel = 'transport'
# peer actor (graceful??) TCP EOF but `tricycle`
# seems to raise a 0-bytes-read?
case ValueError() if (
'unclean EOF' in trans_err.args[0]
): ):
pass
# peer actor (task) prolly shutdown quickly due
# to cancellation
case trio.BrokenResourceError() if (
'Connection reset by peer' in trans_err.args[0]
):
pass
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
loglevel: str = 'warning'
raise TransportClosed( raise TransportClosed(
message=( f'transport {self} was already closed prior ro read'
f'IPC transport already closed by peer\n' )
f'x)> {type(trans_err)}\n'
f' |_{self}\n' if header == b'':
), raise TransportClosed(
loglevel=loglevel, f'transport {self} was already closed prior ro read'
) from trans_err
# XXX definitely can happen if transport is closed
# manually by another `trio.lowlevel.Task` in the
# same actor; we use this in some simulated fault
# testing for ex, but generally should never happen
# under normal operation!
#
# NOTE: as such we always re-raise this error from the
# RPC msg loop!
except trio.ClosedResourceError as closure_err:
raise TransportClosed(
message=(
f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
),
loglevel='error',
raise_on_report=(
closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
),
) from closure_err
# graceful TCP EOF disconnect
if header == b'':
raise TransportClosed(
message=(
f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
),
loglevel='transport',
# cause=??? # handy or no?
) )
size: int
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
log.transport(f'received header {size}') # type: ignore log.transport(f'received header {size}') # type: ignore
@ -308,20 +225,33 @@ class MsgpackTCPStream(MsgTransport):
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# XXX for ctxvar debug only! # TODO: mask out before release?
# if self._codec.pld_spec != codec.pld_spec: if self._codec.pld_spec != codec.pld_spec:
# assert ( # assert (
# task := trio.lowlevel.current_task() # task := trio.lowlevel.current_task()
# ) is not self._task # ) is not self._task
# self._task = task # self._task = task
# self._codec = codec self._codec = codec
# log.runtime( log.runtime(
# f'Using new codec in {self}.recv()\n' f'Using new codec in {self}.recv()\n'
# f'codec: {self._codec}\n\n' f'codec: {self._codec}\n\n'
# f'msg_bytes: {msg_bytes}\n' f'msg_bytes: {msg_bytes}\n'
# ) )
yield codec.decode(msg_bytes) yield codec.decode(msg_bytes)
# TODO: remove, was only for orig draft impl
# testing.
#
# curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
# obj = curr_codec.decode(msg_bytes)
# if (
# curr_codec is not
# _codec._def_msgspec_codec
# ):
# print(f'OBJ: {obj}\n')
#
# yield obj
# XXX NOTE: since the below error derives from # XXX NOTE: since the below error derives from
# `DecodeError` we need to catch is specially # `DecodeError` we need to catch is specially
# and always raise such that spec violations # and always raise such that spec violations
@ -365,8 +295,7 @@ class MsgpackTCPStream(MsgTransport):
msg: msgtypes.MsgType, msg: msgtypes.MsgType,
strict_types: bool = True, strict_types: bool = True,
hide_tb: bool = False, # hide_tb: bool = False,
) -> None: ) -> None:
''' '''
Send a msgpack encoded py-object-blob-as-msg over TCP. Send a msgpack encoded py-object-blob-as-msg over TCP.
@ -375,24 +304,21 @@ class MsgpackTCPStream(MsgTransport):
invalid msg type invalid msg type
''' '''
__tracebackhide__: bool = hide_tb # __tracebackhide__: bool = hide_tb
# XXX see `trio._sync.AsyncContextManagerMixin` for details
# on the `.acquire()`/`.release()` sequencing..
async with self._send_lock: async with self._send_lock:
# NOTE: lookup the `trio.Task.context`'s var for # NOTE: lookup the `trio.Task.context`'s var for
# the current `MsgCodec`. # the current `MsgCodec`.
codec: MsgCodec = _ctxvar_MsgCodec.get() codec: MsgCodec = _ctxvar_MsgCodec.get()
# XXX for ctxvar debug only! # TODO: mask out before release?
# if self._codec.pld_spec != codec.pld_spec: if self._codec.pld_spec != codec.pld_spec:
# self._codec = codec self._codec = codec
# log.runtime( log.runtime(
# f'Using new codec in {self}.send()\n' f'Using new codec in {self}.send()\n'
# f'codec: {self._codec}\n\n' f'codec: {self._codec}\n\n'
# f'msg: {msg}\n' f'msg: {msg}\n'
# ) )
if type(msg) not in msgtypes.__msg_types__: if type(msg) not in msgtypes.__msg_types__:
if strict_types: if strict_types:
@ -426,16 +352,6 @@ class MsgpackTCPStream(MsgTransport):
size: bytes = struct.pack("<I", len(bytes_data)) size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
# ?TODO? does it help ever to dynamically show this
# frame?
# try:
# <the-above_code>
# except BaseException as _err:
# err = _err
# if not isinstance(err, MsgTypeError):
# __tracebackhide__: bool = False
# raise
@property @property
def laddr(self) -> tuple[str, int]: def laddr(self) -> tuple[str, int]:
return self._laddr return self._laddr
@ -445,7 +361,7 @@ class MsgpackTCPStream(MsgTransport):
return self._raddr return self._raddr
async def recv(self) -> Any: async def recv(self) -> Any:
return await self._aiter_pkts.asend(None) return await self._agen.asend(None)
async def drain(self) -> AsyncIterator[dict]: async def drain(self) -> AsyncIterator[dict]:
''' '''
@ -462,7 +378,7 @@ class MsgpackTCPStream(MsgTransport):
yield msg yield msg
def __aiter__(self): def __aiter__(self):
return self._aiter_pkts return self._agen
def connected(self) -> bool: def connected(self) -> bool:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
@ -517,7 +433,7 @@ class Channel:
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None self.uid: tuple[str, str]|None = None
self._aiter_msgs = self._iter_msgs() self._agen = self._aiter_recv()
self._exc: Exception|None = None # set if far end actor errors self._exc: Exception|None = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
@ -581,6 +497,8 @@ class Channel:
) )
return self._transport return self._transport
# TODO: something simliar at the IPC-`Context`
# level so as to support
@cm @cm
def apply_codec( def apply_codec(
self, self,
@ -599,7 +517,6 @@ class Channel:
finally: finally:
self._transport.codec = orig self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs?
def __repr__(self) -> str: def __repr__(self) -> str:
if not self._transport: if not self._transport:
return '<Channel with inactive transport?>' return '<Channel with inactive transport?>'
@ -643,43 +560,27 @@ class Channel:
) )
return transport return transport
# TODO: something like,
# `pdbp.hideframe_on(errors=[MsgTypeError])`
# instead of the `try/except` hack we have rn..
# seems like a pretty useful thing to have in general
# along with being able to filter certain stack frame(s / sets)
# possibly based on the current log-level?
async def send( async def send(
self, self,
payload: Any, payload: Any,
hide_tb: bool = False, # hide_tb: bool = False,
) -> None: ) -> None:
''' '''
Send a coded msg-blob over the transport. Send a coded msg-blob over the transport.
''' '''
__tracebackhide__: bool = hide_tb # __tracebackhide__: bool = hide_tb
try:
log.transport( log.transport(
'=> send IPC msg:\n\n' '=> send IPC msg:\n\n'
f'{pformat(payload)}\n' f'{pformat(payload)}\n'
) ) # type: ignore
# assert self._transport # but why typing? assert self._transport
await self._transport.send( await self._transport.send(
payload, payload,
hide_tb=hide_tb, # hide_tb=hide_tb,
) )
except BaseException as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
assert err.cid
raise
async def recv(self) -> Any: async def recv(self) -> Any:
assert self._transport assert self._transport
@ -716,11 +617,8 @@ class Channel:
await self.aclose(*args) await self.aclose(*args)
def __aiter__(self): def __aiter__(self):
return self._aiter_msgs return self._agen
# ?TODO? run any reconnection sequence?
# -[ ] prolly should be impl-ed as deco-API?
#
# async def _reconnect(self) -> None: # async def _reconnect(self) -> None:
# """Handle connection failures by polling until a reconnect can be # """Handle connection failures by polling until a reconnect can be
# established. # established.
@ -738,6 +636,7 @@ class Channel:
# else: # else:
# log.transport("Stream connection re-established!") # log.transport("Stream connection re-established!")
# # TODO: run any reconnection sequence
# # on_recon = self._recon_seq # # on_recon = self._recon_seq
# # if on_recon: # # if on_recon:
# # await on_recon(self) # # await on_recon(self)
@ -751,17 +650,11 @@ class Channel:
# " for re-establishment") # " for re-establishment")
# await trio.sleep(1) # await trio.sleep(1)
async def _iter_msgs( async def _aiter_recv(
self self
) -> AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
''' '''
Yield `MsgType` IPC msgs decoded and deliverd from Async iterate items from underlying stream.
an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
''' '''
assert self._transport assert self._transport
@ -787,6 +680,15 @@ class Channel:
case _: case _:
yield msg yield msg
# TODO: if we were gonna do this it should be
# done up at the `MsgStream` layer!
#
# sent = yield item
# if sent is not None:
# # optimization, passing None through all the
# # time is pointless
# await self._transport.send(sent)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# if not self._autorecon: # if not self._autorecon:

View File

@ -68,7 +68,7 @@ from .msg import (
MsgCodec, MsgCodec,
PayloadT, PayloadT,
NamespacePath, NamespacePath,
# pretty_struct, pretty_struct,
_ops as msgops, _ops as msgops,
) )
from tractor.msg.types import ( from tractor.msg.types import (
@ -89,16 +89,6 @@ if TYPE_CHECKING:
log = get_logger('tractor') log = get_logger('tractor')
# ?TODO? move to a `tractor.lowlevel._rpc` with the below
# func-type-cases implemented "on top of" `@context` defs:
# -[ ] std async func helper decorated with `@rpc_func`?
# -[ ] `Portal.open_stream_from()` with async-gens?
# |_ possibly a duplex form of this with a
# `sent_from_peer = yield send_to_peer` form, which would require
# syncing the send/recv side with possibly `.receive_nowait()`
# on each `yield`?
# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with
# user only defining a single-`yield` generator-func?
async def _invoke_non_context( async def _invoke_non_context(
actor: Actor, actor: Actor,
cancel_scope: CancelScope, cancel_scope: CancelScope,
@ -118,9 +108,8 @@ async def _invoke_non_context(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
): ):
__tracebackhide__: bool = True __tracebackhide__: bool = True
cs: CancelScope|None = None # ref when activated
# ?TODO? can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
await chan.send( await chan.send(
StartAck( StartAck(
@ -171,6 +160,10 @@ async def _invoke_non_context(
functype='asyncgen', functype='asyncgen',
) )
) )
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs: with cancel_scope as cs:
ctx._scope = cs ctx._scope = cs
task_status.started(ctx) task_status.started(ctx)
@ -182,13 +175,15 @@ async def _invoke_non_context(
await chan.send( await chan.send(
Stop(cid=cid) Stop(cid=cid)
) )
# simplest function/method request-response pattern
# XXX: in the most minimally used case, just a scheduled internal runtime
# call to `Actor._cancel_task()` from the ctx-peer task since we
# don't (yet) have a dedicated IPC msg.
# ------ - ------
else: else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
failed_resp: bool = False failed_resp: bool = False
try: try:
ack = StartAck( ack = StartAck(
@ -359,15 +354,8 @@ async def _errors_relayed_via_ipc(
# channel. # channel.
task_status.started(err) task_status.started(err)
# always propagate KBIs at the sys-process level. # always reraise KBIs so they propagate at the sys-process level.
if ( if isinstance(err, KeyboardInterrupt):
isinstance(err, KeyboardInterrupt)
# ?TODO? except when running in asyncio mode?
# |_ wut if you want to open a `@context` FROM an
# infected_aio task?
# and not actor.is_infected_aio()
):
raise raise
# RPC task bookeeping. # RPC task bookeeping.
@ -470,6 +458,7 @@ async def _invoke(
# tb: TracebackType = None # tb: TracebackType = None
cancel_scope = CancelScope() cancel_scope = CancelScope()
cs: CancelScope|None = None # ref when activated
ctx = actor.get_context( ctx = actor.get_context(
chan=chan, chan=chan,
cid=cid, cid=cid,
@ -618,8 +607,6 @@ async def _invoke(
# `@context` marked RPC function. # `@context` marked RPC function.
# - `._portal` is never set. # - `._portal` is never set.
try: try:
tn: trio.Nursery
rpc_ctx_cs: CancelScope
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
@ -629,7 +616,7 @@ async def _invoke(
), ),
): ):
ctx._scope_nursery = tn ctx._scope_nursery = tn
rpc_ctx_cs = ctx._scope = tn.cancel_scope ctx._scope = tn.cancel_scope
task_status.started(ctx) task_status.started(ctx)
# TODO: better `trionics` tooling: # TODO: better `trionics` tooling:
@ -655,7 +642,7 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()` # itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error # which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser. # is not a `.cancel_acked` pleaser.
if rpc_ctx_cs.cancelled_caught: if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
# first check for and raise any remote error # first check for and raise any remote error
@ -665,7 +652,9 @@ async def _invoke(
if re := ctx._remote_error: if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
if rpc_ctx_cs.cancel_called: cs: CancelScope = ctx._scope
if cs.cancel_called:
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by ' explain: str = f'{ctx.side!r}-side task was cancelled by '
@ -691,14 +680,8 @@ async def _invoke(
elif canceller == ctx.chan.uid: elif canceller == ctx.chan.uid:
explain += f'its {ctx.peer_side!r}-side peer' explain += f'its {ctx.peer_side!r}-side peer'
elif canceller == our_uid:
explain += 'itself'
elif canceller:
explain += 'a remote peer'
else: else:
explain += 'an unknown cause?' explain += 'a remote peer'
explain += ( explain += (
add_div(message=explain) add_div(message=explain)
@ -928,10 +911,7 @@ async def process_messages(
f'IPC msg from peer\n' f'IPC msg from peer\n'
f'<= {chan.uid}\n\n' f'<= {chan.uid}\n\n'
# TODO: use of the pprinting of structs is # TODO: avoid fmting depending on loglevel for perf?
# FRAGILE and should prolly not be
#
# avoid fmting depending on loglevel for perf?
# -[ ] specifically `pretty_struct.pformat()` sub-call..? # -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this? # - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead! # -[ ] use `.msg.pretty_struct` here now instead!
@ -1197,7 +1177,7 @@ async def process_messages(
parent_chan=chan, parent_chan=chan,
) )
except TransportClosed as tc: except TransportClosed:
# channels "breaking" (for TCP streams by EOF or 104 # channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown # connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of # handshake for them (yet) and instead we simply bail out of
@ -1205,20 +1185,12 @@ async def process_messages(
# up.. # up..
# #
# TODO: maybe add a teardown handshake? and, # TODO: maybe add a teardown handshake? and,
# -[x] don't show this msg if it's an ephemeral discovery ep call? # -[ ] don't show this msg if it's an ephemeral discovery ep call?
# |_ see the below `.report_n_maybe_raise()` impl as well as
# tc-exc input details in `MsgpackTCPStream._iter_pkts()`
# for different read-failure cases.
# -[ ] figure out how this will break with other transports? # -[ ] figure out how this will break with other transports?
tc.report_n_maybe_raise( log.runtime(
message=( f'IPC channel closed abruptly\n'
f'peer IPC channel closed abruptly?\n\n' f'<=x peer: {chan.uid}\n'
f'<=x {chan}\n' f' |_{chan.raddr}\n'
f' |_{chan.raddr}\n\n'
)
+
tc.message
) )
# transport **WAS** disconnected # transport **WAS** disconnected
@ -1266,7 +1238,7 @@ async def process_messages(
'Exiting IPC msg loop with final msg\n\n' 'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n' f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n' f' |_{chan}\n\n'
# f'{pretty_struct.pformat(msg)}' f'{pretty_struct.pformat(msg)}'
) )
log.runtime(message) log.runtime(message)

View File

@ -54,12 +54,11 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
# FYI, ERROR is 40 # FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check?
CUSTOM_LEVELS: dict[str, int] = { CUSTOM_LEVELS: dict[str, int] = {
'TRANSPORT': 5, 'TRANSPORT': 5,
'RUNTIME': 15, 'RUNTIME': 15,
'DEVX': 17, 'DEVX': 17,
'CANCEL': 22, 'CANCEL': 18,
'PDB': 500, 'PDB': 500,
} }
STD_PALETTE = { STD_PALETTE = {
@ -148,8 +147,6 @@ class StackLevelAdapter(LoggerAdapter):
Delegate a log call to the underlying logger, after adding Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance. contextual information from this adapter instance.
NOTE: all custom level methods (above) delegate to this!
''' '''
if self.isEnabledFor(level): if self.isEnabledFor(level):
stacklevel: int = 3 stacklevel: int = 3

View File

@ -34,9 +34,6 @@ from pprint import (
saferepr, saferepr,
) )
from tractor.log import get_logger
log = get_logger()
# TODO: auto-gen type sig for input func both for # TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks? # type-msgs and logging of RPC tasks?
# taken and modified from: # taken and modified from:
@ -146,13 +143,7 @@ def pformat(
else: # the `pprint` recursion-safe format: else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
try:
val_str: str = saferepr(v) val_str: str = saferepr(v)
except Exception:
log.exception(
'Failed to `saferepr({type(struct)})` !?\n'
)
return _Struct.__repr__(struct)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg! # TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
@ -203,20 +194,12 @@ class Struct(
return sin_props return sin_props
pformat = pformat pformat = pformat
# __repr__ = pformat
# __str__ = __repr__ = pformat # __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering # TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty? # inside a known tty?
# def __repr__(self) -> str: # def __repr__(self) -> str:
# ... # ...
def __repr__(self) -> str: __repr__ = pformat
try:
return pformat(self)
except Exception:
log.exception(
f'Failed to `pformat({type(self)})` !?\n'
)
return _Struct.__repr__(self)
def copy( def copy(
self, self,

View File

@ -156,12 +156,11 @@ class BroadcastState(Struct):
class BroadcastReceiver(ReceiveChannel): class BroadcastReceiver(ReceiveChannel):
''' '''
A memory receive channel broadcaster which is non-lossy for A memory receive channel broadcaster which is non-lossy for the
the fastest consumer. fastest consumer.
Additional consumer tasks can receive all produced values by Additional consumer tasks can receive all produced values by registering
registering with ``.subscribe()`` and receiving from the new with ``.subscribe()`` and receiving from the new instance it delivers.
instance it delivers.
''' '''
def __init__( def __init__(

View File

@ -18,12 +18,8 @@
Async context manager primitives with hard ``trio``-aware semantics Async context manager primitives with hard ``trio``-aware semantics
''' '''
from __future__ import annotations from contextlib import asynccontextmanager as acm
from contextlib import (
asynccontextmanager as acm,
)
import inspect import inspect
from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
@ -34,16 +30,13 @@ from typing import (
Optional, Optional,
Sequence, Sequence,
TypeVar, TypeVar,
TYPE_CHECKING,
) )
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
if TYPE_CHECKING:
from tractor import ActorNursery
log = get_logger(__name__) log = get_logger(__name__)
@ -53,10 +46,8 @@ T = TypeVar("T")
@acm @acm
async def maybe_open_nursery( async def maybe_open_nursery(
nursery: trio.Nursery|ActorNursery|None = None, nursery: trio.Nursery | None = None,
shield: bool = False, shield: bool = False,
lib: ModuleType = trio,
) -> AsyncGenerator[trio.Nursery, Any]: ) -> AsyncGenerator[trio.Nursery, Any]:
''' '''
Create a new nursery if None provided. Create a new nursery if None provided.
@ -67,12 +58,13 @@ async def maybe_open_nursery(
if nursery is not None: if nursery is not None:
yield nursery yield nursery
else: else:
async with lib.open_nursery() as nursery: async with trio.open_nursery() as nursery:
nursery.cancel_scope.shield = shield nursery.cancel_scope.shield = shield
yield nursery yield nursery
async def _enter_and_wait( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
unwrapped: dict[int, T], unwrapped: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
@ -99,6 +91,7 @@ async def _enter_and_wait(
@acm @acm
async def gather_contexts( async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[ ) -> AsyncGenerator[
@ -109,17 +102,15 @@ async def gather_contexts(
None, None,
]: ]:
''' '''
Concurrently enter a sequence of async context managers (acms), Concurrently enter a sequence of async context managers, each in
each from a separate `trio` task and deliver the unwrapped a separate ``trio`` task and deliver the unwrapped values in the
`yield`-ed values in the same order once all managers have entered. same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
On exit, all acms are subsequently and concurrently exited. This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
This function is somewhat similar to a batch of non-blocking combo with ``asyncio.gather()`` except the managers are concurrently
calls to `contextlib.AsyncExitStack.enter_async_context()` entered and exited, and cancellation just works.
(inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R).
''' '''
seed: int = id(mngrs) seed: int = id(mngrs)
@ -219,10 +210,9 @@ async def maybe_open_context(
) -> AsyncIterator[tuple[bool, T]]: ) -> AsyncIterator[tuple[bool, T]]:
''' '''
Maybe open an async-context-manager (acm) if there is not already Maybe open a context manager if there is not already a _Cached
a `_Cached` version for the provided (input) `key` for *this* actor. version for the provided ``key`` for *this* actor. Return the
_Cached instance on a _Cache hit.
Return the `_Cached` instance on a _Cache hit.
''' '''
fid = id(acm_func) fid = id(acm_func)
@ -283,13 +273,8 @@ async def maybe_open_context(
else: else:
_Cache.users += 1 _Cache.users += 1
log.runtime( log.runtime(
f'Re-using cached resource for user {_Cache.users}\n\n' f'Reusing resource for `_Cache` user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n' f'{ctx_key!r} -> {yielded!r}\n'
# TODO: make this work with values but without
# `msgspec.Struct` causing frickin crashes on field-type
# lookups..
# f'{ctx_key!r} -> {yielded!r}\n'
) )
lock.release() lock.release()
yield True, yielded yield True, yielded