Compare commits

..

11 Commits

Author SHA1 Message Date
Tyler Goodlet a5a0e6854b Use new `Msg[Co]Dec` repr meths in `._exceptions`
Particularly when logging around `MsgTypeError`s.

Other:
- make `_raise_from_unexpected_msg()`'s `expect_msg` a non-default value
  arg, must always be passed by caller.
- drop `'canceller'` from `_body_fields` ow it shows up twice for ctxc.
- use `.msg.pretty_struct.pformat()`.
- parameterize `RemoteActorError.reprol()` (repr-one-line method) to
  show `RemoteActorError[<self.boxed_type_str>]( ..` to make obvi
  the boxed remote error type.
- re-impl `.boxed_type_str` as `str`-casting the `.boxed_type` value
  which is guaranteed to render non-`None`.
2024-04-26 13:09:38 -04:00
Tyler Goodlet c383978402 Add more useful `MsgDec.__repr__()`
Basically exact same as that for `MsgCodec` with the `.spec` displayed
via a better (maybe multi-line) `.spec_str: str` generated from a common
new set of helper mod funcs factored out msg-codec meths:
- `mk_msgspec_table()` to gen a `MsgType` name -> msg table.
- `pformat_msgspec()` to `str`-ify said table values nicely.q

Also add a new `MsgCodec.msg_spec_str: str` prop which delegates to the
above for the same.
2024-04-26 12:49:37 -04:00
Tyler Goodlet 08fcd3fb03 Mk `.msg.pretty_struct.Struct.pformat()` a mod func
More along the lines of `msgspec.struct` and also far more useful
internally for pprinting `MsgTypes`. Of course add method aliases.
2024-04-25 20:00:13 -04:00
Tyler Goodlet adba454d1d Use `Context.[peer_]side` in ctxc messages 2024-04-25 16:19:39 -04:00
Tyler Goodlet 4bab998ff9 Add `Context.peer_side: str` property, mk static-meth private. 2024-04-25 12:38:05 -04:00
Tyler Goodlet c25c77c573 Flip back `StartAck` timeout to `inf`.. 2024-04-25 12:36:14 -04:00
Tyler Goodlet 188ff0e0e5 Another `._rpc` mod passthrough
- tweaking logging to include more `MsgType` dumps on IPC faults.
- removing some commented cruft.
- comment formatting / cleanups / add-ons.
- more type annots.
- fill out some TODO content.
2024-04-25 12:33:10 -04:00
Tyler Goodlet 6b30c86eca Try out `msgspec` encode-buffer optimization
As per the reco:
https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffe

BUT, seems to cause this error in `pikerd`..

`BufferError: Existing exports of data: object cannot be re-sized`

Soo no idea? Maybe there's a tweak needed that we can glean from
tests/examples in the `msgspec` repo?

Disabling for now.
2024-04-24 13:07:05 -04:00
Tyler Goodlet 6aa52417ef Set `Context._stream` in `Portal.open_stream_from()`.. 2024-04-24 12:43:08 -04:00
Tyler Goodlet 18e97a8f9a Use `Context._stream` in `_raise_from_unexpected_msg()`
Instead of expecting it to be passed in (as it was prior), when
determining if a `Stop` msg is a valid end-of-channel signal use the
`ctx._stream: MsgStream|None` attr which **must** be set by any stream
opening API; either of:
- `Context.open_stream()`
- `Portal.open_stream_from()`

Adjust the case block logic to match with fallthrough from any EoC to
a closed error if necessary. Change the `_type: str` to match the
failing IPC-prim name in the tail case we raise a `MessagingError`.

Other:
- move `.sender: tuple` uid attr up to `RemoteActorError` since `Error`
  optionally defines it as a field and for boxed `StreamOverrun`s (an
  ignore case we check for in the runtime during cancellation) we want
  it readable from the boxing rae.
- drop still unused `InternalActorError`.
2024-04-24 12:42:05 -04:00
Tyler Goodlet 5eb9144921 First draft "payload receiver in a new `.msg._ops`
As per much tinkering, re-designs and preceding rubber-ducking via many
"commit msg novelas", **finally** this adds the (hopefully) final
missing layer for typed msg safety: `tractor.msg._ops.PldRx`

(or `PayloadReceiver`? haven't decided how verbose to go..)

Design justification summary:
      ------ - ------
- need a way to be as-close-as-possible to the `tractor`-application
  such that when `MsgType.pld: PayloadT` validation takes place, it is
  straightforward and obvious how user code can decide to handle any
  resulting `MsgTypeError`.
- there should be a common and optional-yet-modular way to modify
  **how** data delivered via IPC (possibly embedded as user defined,
  type-constrained `.pld: msgspec.Struct`s) can be handled and processed
  during fault conditions and/or IPC "msg attacks".
- support for nested type constraints within a `MsgType.pld` field
  should be simple to define, implement and understand at runtime.
- a layer between the app-level IPC primitive APIs
  (`Context`/`MsgStream`) and application-task code (consumer code of
  those APIs) should be easily customized and prove-to-be-as-such
  through demonstrably rigorous internal (sub-sys) use!
  -> eg. via seemless runtime RPC eps support like `Actor.cancel()`
  -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt
    dialog prot, via a dead simple payload-as-ctl-msg-spec.

There are some fairly detailed doc strings included so I won't duplicate
that content, the majority of the work here is actually somewhat of
a factoring of many similar blocks that are doing more or less the same
`msg = await Context._rx_chan.receive()` with boilerplate for
`Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new
`PldRx` basically provides a shim layer for this common "receive msg,
decode its payload, yield it up to the consuming app task" by pairing
the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API
internals to use **one** API instead of re-implementing the same pattern
all over the place XD

`PldRx` breakdown
 ------ - ------
- for now only expects a `._msgdec: MsgDec` which allows for
  override-able `MsgType.pld` validation and most obviously used in
  the impl of `.dec_msg()`, the decode message method.
- provides multiple mem-chan receive options including:
 |_ `.recv_pld()` which does the e2e operation of receiving a payload
    item.
 |_ a sync `.recv_pld_nowait()` version.
 |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the
    shuttling `MsgType` as well as it's `.pld` body for use cases where
    info on both is important (eg. draining a `MsgStream`).

Dirty internal changeover/implementation deatz:
             ------ - ------
- obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield
  logic:
 - `MsgStream.receive[_nowait]()` delegating instead to the equivalent
   `PldRx.recv_pld[_nowait]()`.
 - add `Context._pld_rx: PldRx`, created and passed in by
   `mk_context()`; use it for the `.started()` -> `first: Started`
   retrieval inside `open_context_from_portal()`.
 - all the relevant `Portal` invocation methods: `.result()`,
   `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()`
   and `.Portal_return_once()` outright Bo
- rename `Context.ctx._recv_chan` -> `._rx_chan`.
- add detailed `Context._scope` info for logging whether or not it's
  cancelled inside `_maybe_cancel_and_set_remote_error()`.
- move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()`
  since it's really not necessarily ctx specific per say, and it does
  kinda fit with "msg operations" more abstractly ;)
2024-04-24 00:59:41 -04:00
9 changed files with 348 additions and 280 deletions

View File

@ -472,13 +472,17 @@ class Context:
return 'parent' if self._portal else 'child'
@staticmethod
def peer_side(side: str) -> str:
def _peer_side(side: str) -> str:
match side:
case 'child':
return 'parent'
case 'parent':
return 'child'
@property
def peer_side(self) -> str:
return self._peer_side(self.side)
# TODO: remove stat!
# -[ ] re-implement the `.experiemental._pubsub` stuff
# with `MsgStream` and that should be last usage?
@ -512,9 +516,7 @@ class Context:
equiv of a `StopIteration`.
'''
await self.chan.send(
Stop(cid=self.cid)
)
await self.chan.send(Stop(cid=self.cid))
def _maybe_cancel_and_set_remote_error(
self,
@ -593,7 +595,6 @@ class Context:
# TODO: never do this right?
# if self._remote_error:
# return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this
@ -601,7 +602,7 @@ class Context:
# appropriately.
log.runtime(
'Setting remote error for ctx\n\n'
f'<= {peer_side!r}: {self.chan.uid}\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n'
f'{error}'
)
@ -623,9 +624,8 @@ class Context:
elif isinstance(error, MsgTypeError):
msgerr = True
peer_side: str = self.peer_side(self.side)
log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
f'{error}\n'
f'{pformat(self)}\n'
@ -1067,12 +1067,12 @@ class Context:
except trio.EndOfChannel as eoc:
if (
eoc
and stream.closed
and
stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
# from .devx import pause
# await pause()
log.warning(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
@ -1644,10 +1644,9 @@ class Context:
side: str = self.side
if side == 'child':
assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n'
f'<= peer {self.peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n'
@ -1665,7 +1664,7 @@ class Context:
log_meth = log.runtime
log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
f'{flow_body}'
@ -2330,7 +2329,7 @@ async def open_context_from_portal(
and ctx.cancel_acked
):
log.cancel(
'Context cancelled by caller task\n'
'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n'
@ -2364,6 +2363,7 @@ async def open_context_from_portal(
None,
)
def mk_context(
chan: Channel,
cid: str,

View File

@ -54,6 +54,7 @@ from tractor.msg import (
from tractor.msg.pretty_struct import (
iter_fields,
Struct,
pformat as struct_format,
)
if TYPE_CHECKING:
@ -108,6 +109,10 @@ _body_fields: list[str] = list(
'relay_path',
'_msg_dict',
'cid',
# since only ctxc should show it but `Error` does
# have it as an optional field.
'canceller',
}
)
@ -382,6 +387,9 @@ class RemoteActorError(Exception):
'''
Error type raised by original remote faulting actor.
When the error has only been relayed a single actor-hop
this will be the same as the `.boxed_type`.
'''
if self._src_type is None:
self._src_type = get_err_type(
@ -396,7 +404,8 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type.
'''
return self._ipc_msg.boxed_type_str
bt: Type[BaseException] = self.boxed_type
return str(bt.__name__)
@property
def boxed_type(self) -> str:
@ -492,7 +501,11 @@ class RemoteActorError(Exception):
'''
# TODO: use this matryoshka emjoi XD
# => 🪆
reprol_str: str = f'{type(self).__name__}('
reprol_str: str = (
f'{type(self).__name__}' # type name
f'[{self.boxed_type_str}]' # parameterized by boxed type
'(' # init-style look
)
_repr: str = self._mk_fields_str(
self.reprol_fields,
end_char=' ',
@ -532,7 +545,8 @@ class RemoteActorError(Exception):
self,
) -> BaseException:
'''
Unpack the inner-most source error from it's original IPC msg data.
Unpack the inner-most source error from it's original IPC
msg data.
We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the
@ -570,6 +584,14 @@ class RemoteActorError(Exception):
# # boxed_type=get_type_ref(..
# raise NotImplementedError
@property
def sender(self) -> tuple[str, str]|None:
if (
(msg := self._ipc_msg)
and (value := msg.sender)
):
return tuple(value)
class ContextCancelled(RemoteActorError):
'''
@ -644,8 +666,8 @@ class MsgTypeError(
- `Yield`
- TODO: any embedded `.pld` type defined by user code?
Normally the source of an error is re-raised from some `.msg._codec`
decode which itself raises in a backend interchange
Normally the source of an error is re-raised from some
`.msg._codec` decode which itself raises in a backend interchange
lib (eg. a `msgspec.ValidationError`).
'''
@ -734,20 +756,6 @@ class StreamOverrun(
handled by app code using `MsgStream.send()/.receive()`.
'''
@property
def sender(self) -> tuple[str, str] | None:
value = self._ipc_msg.sender
if value:
return tuple(value)
# class InternalActorError(RemoteActorError):
# '''
# Boxed (Remote) internal `tractor` error indicating failure of some
# primitive, machinery state or lowlevel task that should never
# occur.
# '''
class TransportClosed(trio.ClosedResourceError):
@ -944,8 +952,7 @@ def _raise_from_unexpected_msg(
src_err: AttributeError,
log: StackLevelAdapter, # caller specific `log` obj
expect_msg: str = Yield,
stream: MsgStream | None = None,
expect_msg: Type[MsgType],
# allow "deeper" tbs when debugging B^o
hide_tb: bool = True,
@ -987,6 +994,8 @@ def _raise_from_unexpected_msg(
) from src_err
# TODO: test that shows stream raising an expected error!!!
stream: MsgStream|None
_type: str = 'Context'
# raise the error message in a boxed exception type!
if isinstance(msg, Error):
@ -1003,12 +1012,13 @@ def _raise_from_unexpected_msg(
# TODO: does it make more sense to pack
# the stream._eoc outside this in the calleer always?
# case Stop():
elif (
elif stream := ctx._stream:
_type: str = 'MsgStream'
if (
stream._eoc
or
isinstance(msg, Stop)
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
@ -1040,22 +1050,16 @@ def _raise_from_unexpected_msg(
ctx.maybe_raise()
raise eoc from src_err
if (
stream
and stream._closed
):
# TODO: our own error subtype?
raise trio.ClosedResourceError(
'This stream was closed'
)
# TODO: our own transport/IPC-broke error subtype?
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f"{_type} was expecting a {expect_msg} message"
" BUT received a non-error msg:\n"
f'{pformat(msg)}'
f'{_type} was expecting a {expect_msg.__name__!r} message'
' BUT received a non-error msg:\n\n'
f'{struct_format(msg)}'
) from src_err
@ -1088,13 +1092,11 @@ def _mk_msg_type_err(
# no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part.
if message is None:
fmt_spec: str = codec.pformat_msg_spec()
fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3))
)
tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ',
indent='',
)
@ -1102,8 +1104,7 @@ def _mk_msg_type_err(
f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n'
f'{fmt_spec}\n',
f'{codec.msg_spec_str}\n',
)
elif src_type_error:
src_message: str = str(src_type_error)

View File

@ -420,7 +420,6 @@ class Portal:
kwargs=kwargs,
portal=self,
)
ctx._portal = self
# ensure receive-only stream entrypoint
assert ctx._remote_func_type == 'asyncgen'
@ -430,9 +429,10 @@ class Portal:
async with MsgStream(
ctx=ctx,
rx_chan=ctx._rx_chan,
) as rchan:
self._streams.add(rchan)
yield rchan
) as stream:
self._streams.add(stream)
ctx._stream = stream
yield stream
finally:
@ -454,7 +454,7 @@ class Portal:
# XXX: should this always be done?
# await recv_chan.aclose()
self._streams.remove(rchan)
self._streams.remove(stream)
# NOTE: impl is found in `._context`` mod to make
# reading/groking the details simpler code-org-wise. This

View File

@ -181,12 +181,11 @@ async def _invoke_non_context(
# way: using the linked IPC context machinery.
failed_resp: bool = False
try:
await chan.send(
StartAck(
ack = StartAck(
cid=cid,
functype='asyncfunc',
)
)
await chan.send(ack)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
@ -194,12 +193,11 @@ async def _invoke_non_context(
) as ipc_err:
failed_resp = True
if is_rpc:
raise
raise ipc_err
else:
# TODO: should this be an `.exception()` call?
log.warning(
f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
log.exception(
f'Failed to respond to runtime RPC request for\n\n'
f'{ack}\n'
)
with cancel_scope as cs:
@ -220,20 +218,19 @@ async def _invoke_non_context(
and chan.connected()
):
try:
await chan.send(
return_msg(
ret_msg = return_msg(
cid=cid,
pld=result,
)
)
await chan.send(ret_msg)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
'Failed to send RPC result?\n'
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
f'x=> peer: {chan.uid}\n'
)
@acm
@ -250,7 +247,7 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED,
) -> None:
__tracebackhide__: bool = hide_tb # TODO: use hide_tb here?
__tracebackhide__: bool = hide_tb
try:
yield # run RPC invoke body
@ -262,23 +259,19 @@ async def _errors_relayed_via_ipc(
KeyboardInterrupt,
) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail due to
# an IPC transport error!
# NOTE: always hide this frame from debug REPL call stack
# if the crash originated from an RPC task and we DID NOT
# fail due to an IPC transport error!
if (
is_rpc
and chan.connected()
and
chan.connected()
):
__tracebackhide__: bool = hide_tb
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
if not is_multi_cancelled(err):
entered_debug: bool = False
if (
(
@ -310,16 +303,15 @@ async def _errors_relayed_via_ipc(
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception(
'RPC task crashed\n'
f'|_{ctx}'
)
# always (try to) ship RPC errors back to caller
# ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc:
#
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
@ -331,33 +323,41 @@ async def _errors_relayed_via_ipc(
hide_tb=hide_tb,
)
# error is probably from above coro running code *not from
# the target rpc invocation since a scope was never
# allocated around the coroutine await.
# if the ctx cs is NOT allocated, the error is likely from
# above `coro` invocation machinery NOT from inside the
# `coro` itself, i.e. err is NOT a user application error.
if ctx._scope is None:
# we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this
# channel.
task_status.started(err)
# always reraise KBIs so they propagate at the sys-process
# level.
# always reraise KBIs so they propagate at the sys-process level.
if isinstance(err, KeyboardInterrupt):
raise
# RPC task bookeeping
# RPC task bookeeping.
# since RPC tasks are scheduled inside a flat
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
try:
ctx, func, is_complete = actor._rpc_tasks.pop(
ctx: Context
func: Callable
is_complete: trio.Event
(
ctx,
func,
is_complete,
) = actor._rpc_tasks.pop(
(chan, ctx.cid)
)
is_complete.set()
except KeyError:
if is_rpc:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
if is_rpc:
log.warning(
'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n'
@ -372,7 +372,7 @@ async def _errors_relayed_via_ipc(
finally:
if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed")
log.runtime('All RPC tasks have completed')
actor._ongoing_rpc_tasks.set()
@ -414,19 +414,16 @@ async def _invoke(
# TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)?
# tb = None
# tb: TracebackType = None
cancel_scope = CancelScope()
# activated cancel scope ref
cs: CancelScope|None = None
cs: CancelScope|None = None # ref when activated
ctx = actor.get_context(
chan=chan,
cid=cid,
nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it?
# side='callee',
# NOTE: no portal passed bc this is the "child"-side
# We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to
@ -459,8 +456,8 @@ async def _invoke(
kwargs['stream'] = ctx
elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False):
kwargs['ctx'] = ctx
context = True
@ -474,7 +471,8 @@ async def _invoke(
task_status=task_status,
):
if not (
inspect.isasyncgenfunction(func) or
inspect.isasyncgenfunction(func)
or
inspect.iscoroutinefunction(func)
):
raise TypeError(f'{func} must be an async function!')
@ -486,8 +484,7 @@ async def _invoke(
except TypeError:
raise
# TODO: implement all these cases in terms of the
# `Context` one!
# TODO: impl all these cases in terms of the `Context` one!
if not context:
await _invoke_non_context(
actor,
@ -503,7 +500,7 @@ async def _invoke(
return_msg,
task_status,
)
# below is only for `@context` funcs
# XXX below fallthrough is ONLY for `@context` eps
return
# our most general case: a remote SC-transitive,
@ -580,9 +577,6 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser.
# - currently a never-should-happen-fallthrough case
# inside ._context._drain_to_final_msg()`..
# # TODO: remove this ^ right?
if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid
@ -598,9 +592,7 @@ async def _invoke(
if cs.cancel_called:
canceller: tuple = ctx.canceller
msg: str = (
'actor was cancelled by '
)
explain: str = f'{ctx.side!r}-side task was cancelled by '
# NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call
@ -610,22 +602,28 @@ async def _invoke(
if ctx._cancel_called:
# TODO: test for this!!!!!
canceller: tuple = our_uid
msg += 'itself '
explain += 'itself '
# if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs.
# it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
#
# TODO: determine if the ctx peer task was the
# exact task which cancelled, vs. some other
# task in the same actor.
elif canceller == ctx.chan.uid:
msg += 'its caller'
explain += f'its {ctx.peer_side!r}-side peer'
else:
msg += 'a remote peer'
explain += 'a remote peer'
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------'
div_offset: int = (
round(len(msg)/2)+1
round(len(explain)/2)+1
+
round(len(div_chars)/2)+1
)
@ -636,11 +634,12 @@ async def _invoke(
+
f'{div_chars}\n'
)
msg += (
explain += (
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_{ctx._task}()'
f'=> cancellee: {our_uid}\n'
# TODO: better repr for ctx tasks..
f' |_{ctx.side!r} {ctx._task}'
# TODO: instead just show the
# ctx.__str__() here?
@ -659,7 +658,7 @@ async def _invoke(
# task, so relay this cancel signal to the
# other side.
ctxc = ContextCancelled(
message=msg,
message=explain,
boxed_type=trio.Cancelled,
canceller=canceller,
)
@ -702,11 +701,9 @@ async def _invoke(
ctx: Context = actor._contexts.pop((
chan.uid,
cid,
# ctx.side,
))
merr: Exception|None = ctx.maybe_error
(
res_type_str,
res_str,
@ -720,7 +717,7 @@ async def _invoke(
)
log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n'
f'{ctx}'
)
@ -806,13 +803,19 @@ async def process_messages(
and `Actor.cancel()` process-wide-runtime-shutdown requests
(as utilized inside `Portal.cancel_actor()` ).
'''
assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
# https://github.com/python-trio/trio/issues/467
# -[ ] existing GH https://github.com/python-trio/trio/issues/467
# -[ ] for other transports (like QUIC) we can possibly just
# entirely avoid the feeder mem-chans since each msg will be
# delivered with a ctx-id already?
#
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime(
'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n'
@ -850,7 +853,7 @@ async def process_messages(
| Return(cid=cid)
| CancelAck(cid=cid)
# `.cid` means RPC-ctx-task specific
# `.cid` indicates RPC-ctx-task scoped
| Error(cid=cid)
# recv-side `MsgType` decode violation
@ -1046,16 +1049,16 @@ async def process_messages(
trio.Event(),
)
# runtime-scoped remote error (since no `.cid`)
# runtime-scoped remote (internal) error
# (^- bc no `Error.cid` -^)
#
# NOTE: this is the non-rpc error case, that
# is, an error NOT raised inside a call to
# `_invoke()` (i.e. no cid was provided in the
# msg - see above). Raise error inline and
# mark the channel as "globally errored" for
# all downstream consuming primitives.
case Error():
# NOTE: this is the non-rpc error case,
# that is, an error **not** raised inside
# a call to ``_invoke()`` (i.e. no cid was
# provided in the msg - see above). Push
# this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
# assert chan.uid
chan._exc: Exception = unpack_error(
msg,
chan=chan,
@ -1111,7 +1114,7 @@ async def process_messages(
f'|_{chan.raddr}\n'
)
# transport **was** disconnected
# transport **WAS** disconnected
return True
except (
@ -1150,12 +1153,11 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
log.runtime(
'Exiting IPC msg loop with\n'
f'peer: {chan.uid}\n'
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f'|_{chan}\n\n'
'final msg:\n'
f'{pformat(msg)}\n'
f'{pformat(msg)}\n\n'
)
# transport **was not** disconnected
# transport **WAS NOT** disconnected
return False

View File

@ -850,7 +850,7 @@ class Actor:
msg_buffer_size: int|None = None,
allow_overruns: bool = False,
load_nsf: bool = False,
ack_timeout: float = 3,
ack_timeout: float = float('inf'),
) -> Context:
'''

View File

@ -37,6 +37,11 @@ from ._codec import (
MsgDec as MsgDec,
current_codec as current_codec,
)
# currently can't bc circular with `._context`
# from ._ops import (
# PldRx as PldRx,
# _drain_to_final_msg as _drain_to_final_msg,
# )
from .types import (
Msg as Msg,

View File

@ -75,7 +75,7 @@ log = get_logger(__name__)
# TODO: unify with `MsgCodec` by making `._dec` part this?
class MsgDec(Struct):
'''
An IPC msg decoder.
An IPC msg (payload) decoder.
Normally used to decode only a payload: `MsgType.pld:
PayloadT` field before delivery to IPC consumer code.
@ -87,6 +87,31 @@ class MsgDec(Struct):
def dec(self) -> msgpack.Decoder:
return self._dec
def __repr__(self) -> str:
speclines: str = self.spec_str
# in multi-typed spec case we stick the list
# all on newlines after the |__pld_spec__:,
# OW it's prolly single type spec-value
# so just leave it on same line.
if '\n' in speclines:
speclines: str = '\n' + textwrap.indent(
speclines,
prefix=' '*3,
)
body: str = textwrap.indent(
f'|_dec_hook: {self.dec.dec_hook}\n'
f'|__pld_spec__: {speclines}\n',
prefix=' '*2,
)
return (
f'<{type(self).__name__}(\n'
f'{body}'
')>'
)
# struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions
#
@ -137,17 +162,7 @@ class MsgDec(Struct):
# TODO: would get moved into `FieldSpec.__str__()` right?
@property
def spec_str(self) -> str:
# TODO: could also use match: instead?
spec: Union[Type]|Type = self.spec
# `typing.Union` case
if getattr(spec, '__args__', False):
return str(spec)
# just a single type
else:
return spec.__name__
return pformat_msgspec(codec=self)
pld_spec_str = spec_str
@ -168,10 +183,58 @@ def mk_dec(
) -> MsgDec:
return msgpack.Decoder(
return MsgDec(
_dec=msgpack.Decoder(
type=spec, # like `Msg[Any]`
dec_hook=dec_hook,
)
)
def mk_msgspec_table(
dec: msgpack.Decoder,
msg: MsgType|None = None,
) -> dict[str, MsgType]|str:
'''
Fill out a `dict` of `MsgType`s keyed by name
for a given input `msgspec.msgpack.Decoder`
as defined by its `.type: Union[Type]` setting.
If `msg` is provided, only deliver a `dict` with a single
entry for that type.
'''
msgspec: Union[Type]|Type = dec.type
if not (msgtypes := getattr(msgspec, '__args__', False)):
msgtypes = [msgspec]
msgt_table: dict[str, MsgType] = {
msgt: str(msgt)
for msgt in msgtypes
}
if msg:
msgt: MsgType = type(msg)
str_repr: str = msgt_table[msgt]
return {msgt: str_repr}
return msgt_table
def pformat_msgspec(
codec: MsgCodec|MsgDec,
msg: MsgType|None = None,
join_char: str = '\n',
) -> str:
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join(
mk_msgspec_table(
dec=dec,
msg=msg,
).values()
)
# TODO: overall IPC msg-spec features (i.e. in this mod)!
#
@ -200,7 +263,7 @@ class MsgCodec(Struct):
def __repr__(self) -> str:
speclines: str = textwrap.indent(
self.pformat_msg_spec(),
pformat_msgspec(codec=self),
prefix=' '*3,
)
body: str = textwrap.indent(
@ -244,33 +307,11 @@ class MsgCodec(Struct):
# NOTE: defined and applied inside `mk_codec()`
return self._dec.type
def msg_spec_items(
self,
msg: MsgType|None = None,
) -> dict[str, MsgType]|str:
msgt_table: dict[str, MsgType] = {
msgt: str(msgt)
for msgt in self.msg_spec.__args__
}
if msg:
msgt: MsgType = type(msg)
str_repr: str = msgt_table[msgt]
return {msgt: str_repr}
return msgt_table
# TODO: some way to make `pretty_struct.Struct` use this
# wrapped field over the `.msg_spec` one?
def pformat_msg_spec(
self,
msg: MsgType|None = None,
join_char: str = '\n',
) -> str:
return join_char.join(
self.msg_spec_items(msg=msg).values()
)
@property
def msg_spec_str(self) -> str:
return pformat_msgspec(self.msg_spec)
lib: ModuleType = msgspec
@ -280,16 +321,31 @@ class MsgCodec(Struct):
def enc(self) -> msgpack.Encoder:
return self._enc
# TODO: reusing encode buffer for perf?
# https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
_buf: bytearray = bytearray()
def encode(
self,
py_obj: Any,
use_buf: bool = False,
# ^-XXX-^ uhh why am i getting this?
# |_BufferError: Existing exports of data: object cannot be re-sized
) -> bytes:
'''
Encode input python objects to `msgpack` bytes for
transfer on a tranport protocol connection.
When `use_buf == True` use the output buffer optimization:
https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
'''
if use_buf:
self._enc.encode_into(py_obj, self._buf)
return self._buf
else:
return self._enc.encode(py_obj)
@property

View File

@ -146,7 +146,11 @@ class PldRx(Struct):
# sync-rx msg from underlying IPC feeder (mem-)chan
ctx._rx_chan.receive_nowait()
)
return self.dec_msg(msg)
return self.dec_msg(
msg,
ctx=ctx,
expect_msg=expect_msg,
)
async def recv_pld(
self,

View File

@ -102,6 +102,59 @@ def iter_fields(struct: Struct) -> Iterator[
)
def pformat(
struct: Struct,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in iter_fields(struct):
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
class Struct(
_Struct,
@ -140,65 +193,12 @@ class Struct(
return sin_props
# TODO: make thisi a mod-func!
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in iter_fields(self):
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
pformat = pformat
# __str__ = __repr__ = pformat
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(