Compare commits

..

1 Commits

Author SHA1 Message Date
Tyler Goodlet 6e0ef76128 First draft "payload receiver in a new `.msg._ops`
As per much tinkering, re-designs and preceding rubber-ducking via many
"commit msg novelas", **finally** this adds the (hopefully) final
missing layer for typed msg safety: `tractor.msg._ops.PldRx`

(or `PayloadReceiver`? haven't decided how verbose to go..)

Design justification summary:
      ------ - ------
- need a way to be as-close-as-possible to the `tractor`-application
  such that when `MsgType.pld: PayloadT` validation takes place, it is
  straightforward and obvious how user code can decide to handle any
  resulting `MsgTypeError`.
- there should be a common and optional-yet-modular way to modify
  **how** data delivered via IPC (possibly embedded as user defined,
  type-constrained `.pld: msgspec.Struct`s) can be handled and processed
  during fault conditions and/or IPC "msg attacks".
- support for nested type constraints within a `MsgType.pld` field
  should be simple to define, implement and understand at runtime.
- a layer between the app-level IPC primitive APIs
  (`Context`/`MsgStream`) and application-task code (consumer code of
  those APIs) should be easily customized and prove-to-be-as-such
  through demonstrably rigorous internal (sub-sys) use!
  -> eg. via seemless runtime RPC eps support like `Actor.cancel()`
  -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt
    dialog prot, via a dead simple payload-as-ctl-msg-spec.

There are some fairly detailed doc strings included so I won't duplicate
that content, the majority of the work here is actually somewhat of
a factoring of many similar blocks that are doing more or less the same
`msg = await Context._rx_chan.receive()` with boilerplate for
`Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new
`PldRx` basically provides a shim layer for this common "receive msg,
decode its payload, yield it up to the consuming app task" by pairing
the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API
internals to use **one** API instead of re-implementing the same pattern
all over the place XD

`PldRx` breakdown
 ------ - ------
- for now only expects a `._msgdec: MsgDec` which allows for
  override-able `MsgType.pld` validation and most obviously used in
  the impl of `.dec_msg()`, the decode message method.
- provides multiple mem-chan receive options including:
 |_ `.recv_pld()` which does the e2e operation of receiving a payload
    item.
 |_ a sync `.recv_pld_nowait()` version.
 |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the
    shuttling `MsgType` as well as it's `.pld` body for use cases where
    info on both is important (eg. draining a `MsgStream`).

Dirty internal changeover/implementation deatz:
             ------ - ------
- obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield
  logic:
 - `MsgStream.receive[_nowait]()` delegating instead to the equivalent
   `PldRx.recv_pld[_nowait]()`.
 - add `Context._pld_rx: PldRx`, created and passed in by
   `mk_context()`; use it for the `.started()` -> `first: Started`
   retrieval inside `open_context_from_portal()`.
 - all the relevant `Portal` invocation methods: `.result()`,
   `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()`
   and `.Portal_return_once()` outright Bo
- rename `Context.ctx._recv_chan` -> `._rx_chan`.
- add detailed `Context._scope` info for logging whether or not it's
  cancelled inside `_maybe_cancel_and_set_remote_error()`.
- move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()`
  since it's really not necessarily ctx specific per say, and it does
  kinda fit with "msg operations" more abstractly ;)
2024-04-23 20:56:56 -04:00
9 changed files with 280 additions and 348 deletions

View File

@ -472,17 +472,13 @@ class Context:
return 'parent' if self._portal else 'child' return 'parent' if self._portal else 'child'
@staticmethod @staticmethod
def _peer_side(side: str) -> str: def peer_side(side: str) -> str:
match side: match side:
case 'child': case 'child':
return 'parent' return 'parent'
case 'parent': case 'parent':
return 'child' return 'child'
@property
def peer_side(self) -> str:
return self._peer_side(self.side)
# TODO: remove stat! # TODO: remove stat!
# -[ ] re-implement the `.experiemental._pubsub` stuff # -[ ] re-implement the `.experiemental._pubsub` stuff
# with `MsgStream` and that should be last usage? # with `MsgStream` and that should be last usage?
@ -516,7 +512,9 @@ class Context:
equiv of a `StopIteration`. equiv of a `StopIteration`.
''' '''
await self.chan.send(Stop(cid=self.cid)) await self.chan.send(
Stop(cid=self.cid)
)
def _maybe_cancel_and_set_remote_error( def _maybe_cancel_and_set_remote_error(
self, self,
@ -595,6 +593,7 @@ class Context:
# TODO: never do this right? # TODO: never do this right?
# if self._remote_error: # if self._remote_error:
# return # return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that # XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this # after we cancel whatever task is the opener of this
@ -602,7 +601,7 @@ class Context:
# appropriately. # appropriately.
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {self.peer_side!r}: {self.chan.uid}\n' f'<= {peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n' f'=> {self.side!r}\n\n'
f'{error}' f'{error}'
) )
@ -624,8 +623,9 @@ class Context:
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgerr = True msgerr = True
peer_side: str = self.peer_side(self.side)
log.error( log.error(
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
f'{error}\n' f'{error}\n'
f'{pformat(self)}\n' f'{pformat(self)}\n'
@ -1067,12 +1067,12 @@ class Context:
except trio.EndOfChannel as eoc: except trio.EndOfChannel as eoc:
if ( if (
eoc eoc
and and stream.closed
stream.closed
): ):
# sanity, can remove? # sanity, can remove?
assert eoc is stream._eoc assert eoc is stream._eoc
# from .devx import pause
# await pause()
log.warning( log.warning(
'Stream was terminated by EoC\n\n' 'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but # NOTE: won't show the error <Type> but
@ -1644,9 +1644,10 @@ class Context:
side: str = self.side side: str = self.side
if side == 'child': if side == 'child':
assert not self._portal assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = ( flow_body: str = (
f'<= peer {self.peer_side!r}: {from_uid}\n' f'<= peer {peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n' f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n' f'=> {side!r}: {self._task}\n'
@ -1664,7 +1665,7 @@ class Context:
log_meth = log.runtime log_meth = log.runtime
log_meth( log_meth(
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'{flow_body}' f'{flow_body}'
@ -2329,7 +2330,7 @@ async def open_context_from_portal(
and ctx.cancel_acked and ctx.cancel_acked
): ):
log.cancel( log.cancel(
'Context cancelled by {ctx.side!r}-side task\n' 'Context cancelled by caller task\n'
f'|_{ctx._task}\n\n' f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n' f'{repr(scope_err)}\n'
@ -2363,7 +2364,6 @@ async def open_context_from_portal(
None, None,
) )
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,

View File

@ -54,7 +54,6 @@ from tractor.msg import (
from tractor.msg.pretty_struct import ( from tractor.msg.pretty_struct import (
iter_fields, iter_fields,
Struct, Struct,
pformat as struct_format,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -109,10 +108,6 @@ _body_fields: list[str] = list(
'relay_path', 'relay_path',
'_msg_dict', '_msg_dict',
'cid', 'cid',
# since only ctxc should show it but `Error` does
# have it as an optional field.
'canceller',
} }
) )
@ -387,9 +382,6 @@ class RemoteActorError(Exception):
''' '''
Error type raised by original remote faulting actor. Error type raised by original remote faulting actor.
When the error has only been relayed a single actor-hop
this will be the same as the `.boxed_type`.
''' '''
if self._src_type is None: if self._src_type is None:
self._src_type = get_err_type( self._src_type = get_err_type(
@ -404,8 +396,7 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type. String-name of the (last hop's) boxed error type.
''' '''
bt: Type[BaseException] = self.boxed_type return self._ipc_msg.boxed_type_str
return str(bt.__name__)
@property @property
def boxed_type(self) -> str: def boxed_type(self) -> str:
@ -501,11 +492,7 @@ class RemoteActorError(Exception):
''' '''
# TODO: use this matryoshka emjoi XD # TODO: use this matryoshka emjoi XD
# => 🪆 # => 🪆
reprol_str: str = ( reprol_str: str = f'{type(self).__name__}('
f'{type(self).__name__}' # type name
f'[{self.boxed_type_str}]' # parameterized by boxed type
'(' # init-style look
)
_repr: str = self._mk_fields_str( _repr: str = self._mk_fields_str(
self.reprol_fields, self.reprol_fields,
end_char=' ', end_char=' ',
@ -545,8 +532,7 @@ class RemoteActorError(Exception):
self, self,
) -> BaseException: ) -> BaseException:
''' '''
Unpack the inner-most source error from it's original IPC Unpack the inner-most source error from it's original IPC msg data.
msg data.
We attempt to reconstruct (as best as we can) the original We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the `Exception` from as it would have been raised in the
@ -584,14 +570,6 @@ class RemoteActorError(Exception):
# # boxed_type=get_type_ref(.. # # boxed_type=get_type_ref(..
# raise NotImplementedError # raise NotImplementedError
@property
def sender(self) -> tuple[str, str]|None:
if (
(msg := self._ipc_msg)
and (value := msg.sender)
):
return tuple(value)
class ContextCancelled(RemoteActorError): class ContextCancelled(RemoteActorError):
''' '''
@ -666,8 +644,8 @@ class MsgTypeError(
- `Yield` - `Yield`
- TODO: any embedded `.pld` type defined by user code? - TODO: any embedded `.pld` type defined by user code?
Normally the source of an error is re-raised from some Normally the source of an error is re-raised from some `.msg._codec`
`.msg._codec` decode which itself raises in a backend interchange decode which itself raises in a backend interchange
lib (eg. a `msgspec.ValidationError`). lib (eg. a `msgspec.ValidationError`).
''' '''
@ -756,6 +734,20 @@ class StreamOverrun(
handled by app code using `MsgStream.send()/.receive()`. handled by app code using `MsgStream.send()/.receive()`.
''' '''
@property
def sender(self) -> tuple[str, str] | None:
value = self._ipc_msg.sender
if value:
return tuple(value)
# class InternalActorError(RemoteActorError):
# '''
# Boxed (Remote) internal `tractor` error indicating failure of some
# primitive, machinery state or lowlevel task that should never
# occur.
# '''
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.ClosedResourceError):
@ -952,7 +944,8 @@ def _raise_from_unexpected_msg(
src_err: AttributeError, src_err: AttributeError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj
expect_msg: Type[MsgType], expect_msg: str = Yield,
stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o # allow "deeper" tbs when debugging B^o
hide_tb: bool = True, hide_tb: bool = True,
@ -994,8 +987,6 @@ def _raise_from_unexpected_msg(
) from src_err ) from src_err
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
stream: MsgStream|None
_type: str = 'Context'
# raise the error message in a boxed exception type! # raise the error message in a boxed exception type!
if isinstance(msg, Error): if isinstance(msg, Error):
@ -1012,13 +1003,12 @@ def _raise_from_unexpected_msg(
# TODO: does it make more sense to pack # TODO: does it make more sense to pack
# the stream._eoc outside this in the calleer always? # the stream._eoc outside this in the calleer always?
# case Stop(): # case Stop():
elif stream := ctx._stream: elif (
_type: str = 'MsgStream'
if (
stream._eoc
or
isinstance(msg, Stop) isinstance(msg, Stop)
or (
stream
and stream._eoc
)
): ):
log.debug( log.debug(
f'Context[{cid}] stream was stopped by remote side\n' f'Context[{cid}] stream was stopped by remote side\n'
@ -1050,16 +1040,22 @@ def _raise_from_unexpected_msg(
ctx.maybe_raise() ctx.maybe_raise()
raise eoc from src_err raise eoc from src_err
# TODO: our own transport/IPC-broke error subtype? if (
if stream._closed: stream
raise trio.ClosedResourceError('This stream was closed') and stream._closed
):
# TODO: our own error subtype?
raise trio.ClosedResourceError(
'This stream was closed'
)
# always re-raise the source error if no translation error case # always re-raise the source error if no translation error case
# is activated above. # is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError( raise MessagingError(
f'{_type} was expecting a {expect_msg.__name__!r} message' f"{_type} was expecting a {expect_msg} message"
' BUT received a non-error msg:\n\n' " BUT received a non-error msg:\n"
f'{struct_format(msg)}' f'{pformat(msg)}'
) from src_err ) from src_err
@ -1092,11 +1088,13 @@ def _mk_msg_type_err(
# no src error from `msgspec.msgpack.Decoder.decode()` so # no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part. # prolly a manual type-check on our part.
if message is None: if message is None:
fmt_spec: str = codec.pformat_msg_spec()
fmt_stack: str = ( fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3)) '\n'.join(traceback.format_stack(limit=3))
) )
tb_fmt: str = pformat_boxed_tb( tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack, tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ', field_prefix=' ',
indent='', indent='',
) )
@ -1104,7 +1102,8 @@ def _mk_msg_type_err(
f'invalid msg -> {msg}: {type(msg)}\n\n' f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n' f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n' f'Valid IPC msgs are:\n\n'
f'{codec.msg_spec_str}\n', # f' ------ - ------\n'
f'{fmt_spec}\n',
) )
elif src_type_error: elif src_type_error:
src_message: str = str(src_type_error) src_message: str = str(src_type_error)

View File

@ -420,6 +420,7 @@ class Portal:
kwargs=kwargs, kwargs=kwargs,
portal=self, portal=self,
) )
ctx._portal = self
# ensure receive-only stream entrypoint # ensure receive-only stream entrypoint
assert ctx._remote_func_type == 'asyncgen' assert ctx._remote_func_type == 'asyncgen'
@ -429,10 +430,9 @@ class Portal:
async with MsgStream( async with MsgStream(
ctx=ctx, ctx=ctx,
rx_chan=ctx._rx_chan, rx_chan=ctx._rx_chan,
) as stream: ) as rchan:
self._streams.add(stream) self._streams.add(rchan)
ctx._stream = stream yield rchan
yield stream
finally: finally:
@ -454,7 +454,7 @@ class Portal:
# XXX: should this always be done? # XXX: should this always be done?
# await recv_chan.aclose() # await recv_chan.aclose()
self._streams.remove(stream) self._streams.remove(rchan)
# NOTE: impl is found in `._context`` mod to make # NOTE: impl is found in `._context`` mod to make
# reading/groking the details simpler code-org-wise. This # reading/groking the details simpler code-org-wise. This

View File

@ -181,11 +181,12 @@ async def _invoke_non_context(
# way: using the linked IPC context machinery. # way: using the linked IPC context machinery.
failed_resp: bool = False failed_resp: bool = False
try: try:
ack = StartAck( await chan.send(
StartAck(
cid=cid, cid=cid,
functype='asyncfunc', functype='asyncfunc',
) )
await chan.send(ack) )
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
@ -193,11 +194,12 @@ async def _invoke_non_context(
) as ipc_err: ) as ipc_err:
failed_resp = True failed_resp = True
if is_rpc: if is_rpc:
raise ipc_err raise
else: else:
log.exception( # TODO: should this be an `.exception()` call?
f'Failed to respond to runtime RPC request for\n\n' log.warning(
f'{ack}\n' f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
) )
with cancel_scope as cs: with cancel_scope as cs:
@ -218,19 +220,20 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
ret_msg = return_msg( await chan.send(
return_msg(
cid=cid, cid=cid,
pld=result, pld=result,
) )
await chan.send(ret_msg) )
except ( except (
BrokenPipeError, BrokenPipeError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.warning( log.warning(
'Failed to send RPC result?\n' 'Failed to return result:\n'
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n' f'{func}@{actor.uid}\n'
f'x=> peer: {chan.uid}\n' f'remote chan: {chan.uid}'
) )
@acm @acm
@ -247,7 +250,7 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb # TODO: use hide_tb here?
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -259,19 +262,23 @@ async def _errors_relayed_via_ipc(
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
# NOTE: always hide this frame from debug REPL call stack # always hide this frame from debug REPL if the crash
# if the crash originated from an RPC task and we DID NOT # originated from an rpc task and we DID NOT fail due to
# fail due to an IPC transport error! # an IPC transport error!
if ( if (
is_rpc is_rpc
and and chan.connected()
chan.connected()
): ):
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging # TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
if not is_multi_cancelled(err):
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
entered_debug: bool = False entered_debug: bool = False
if ( if (
( (
@ -303,15 +310,16 @@ async def _errors_relayed_via_ipc(
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err) entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug: if not entered_debug:
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
f'|_{ctx}' f'|_{ctx}'
) )
# ALWAYS try to ship RPC errors back to parent/caller task # always (try to) ship RPC errors back to caller
if is_rpc: if is_rpc:
#
# TODO: tests for this scenario: # TODO: tests for this scenario:
# - RPC caller closes connection before getting a response # - RPC caller closes connection before getting a response
# should **not** crash this actor.. # should **not** crash this actor..
@ -323,41 +331,33 @@ async def _errors_relayed_via_ipc(
hide_tb=hide_tb, hide_tb=hide_tb,
) )
# if the ctx cs is NOT allocated, the error is likely from # error is probably from above coro running code *not from
# above `coro` invocation machinery NOT from inside the # the target rpc invocation since a scope was never
# `coro` itself, i.e. err is NOT a user application error. # allocated around the coroutine await.
if ctx._scope is None: if ctx._scope is None:
# we don't ever raise directly here to allow the # we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this # msg-loop-scheduler to continue running for this
# channel. # channel.
task_status.started(err) task_status.started(err)
# always reraise KBIs so they propagate at the sys-process level. # always reraise KBIs so they propagate at the sys-process
# level.
if isinstance(err, KeyboardInterrupt): if isinstance(err, KeyboardInterrupt):
raise raise
# RPC task bookeeping.
# since RPC tasks are scheduled inside a flat # RPC task bookeeping
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally: finally:
try: try:
ctx: Context ctx, func, is_complete = actor._rpc_tasks.pop(
func: Callable
is_complete: trio.Event
(
ctx,
func,
is_complete,
) = actor._rpc_tasks.pop(
(chan, ctx.cid) (chan, ctx.cid)
) )
is_complete.set() is_complete.set()
except KeyError: except KeyError:
if is_rpc:
# If we're cancelled before the task returns then the # If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
if is_rpc:
log.warning( log.warning(
'RPC task likely errored or cancelled before start?' 'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
@ -372,7 +372,7 @@ async def _errors_relayed_via_ipc(
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.runtime('All RPC tasks have completed') log.runtime("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
@ -414,16 +414,19 @@ async def _invoke(
# TODO: possibly a specially formatted traceback # TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)? # (not sure what typing is for this..)?
# tb: TracebackType = None # tb = None
cancel_scope = CancelScope() cancel_scope = CancelScope()
cs: CancelScope|None = None # ref when activated # activated cancel scope ref
cs: CancelScope|None = None
ctx = actor.get_context( ctx = actor.get_context(
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=NamespacePath.from_ref(func), nsf=NamespacePath.from_ref(func),
# NOTE: no portal passed bc this is the "child"-side # TODO: if we wanted to get cray and support it?
# side='callee',
# We shouldn't ever need to pass this through right? # We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to # it's up to the soon-to-be called rpc task to
@ -456,8 +459,8 @@ async def _invoke(
kwargs['stream'] = ctx kwargs['stream'] = ctx
# handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False): elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
context = True context = True
@ -471,8 +474,7 @@ async def _invoke(
task_status=task_status, task_status=task_status,
): ):
if not ( if not (
inspect.isasyncgenfunction(func) inspect.isasyncgenfunction(func) or
or
inspect.iscoroutinefunction(func) inspect.iscoroutinefunction(func)
): ):
raise TypeError(f'{func} must be an async function!') raise TypeError(f'{func} must be an async function!')
@ -484,7 +486,8 @@ async def _invoke(
except TypeError: except TypeError:
raise raise
# TODO: impl all these cases in terms of the `Context` one! # TODO: implement all these cases in terms of the
# `Context` one!
if not context: if not context:
await _invoke_non_context( await _invoke_non_context(
actor, actor,
@ -500,7 +503,7 @@ async def _invoke(
return_msg, return_msg,
task_status, task_status,
) )
# XXX below fallthrough is ONLY for `@context` eps # below is only for `@context` funcs
return return
# our most general case: a remote SC-transitive, # our most general case: a remote SC-transitive,
@ -577,6 +580,9 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()` # itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error # which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser. # is not a `.cancel_acked` pleaser.
# - currently a never-should-happen-fallthrough case
# inside ._context._drain_to_final_msg()`..
# # TODO: remove this ^ right?
if ctx._scope.cancelled_caught: if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
@ -592,7 +598,9 @@ async def _invoke(
if cs.cancel_called: if cs.cancel_called:
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
explain: str = f'{ctx.side!r}-side task was cancelled by ' msg: str = (
'actor was cancelled by '
)
# NOTE / TODO: if we end up having # NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call # ``Actor._cancel_task()`` call
@ -602,28 +610,22 @@ async def _invoke(
if ctx._cancel_called: if ctx._cancel_called:
# TODO: test for this!!!!! # TODO: test for this!!!!!
canceller: tuple = our_uid canceller: tuple = our_uid
explain += 'itself ' msg += 'itself '
# if the channel which spawned the ctx is the # if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs. # one that cancelled it then we report that, vs.
# it being some other random actor that for ex. # it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()` # some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx. # and by side-effect cancels this ctx.
#
# TODO: determine if the ctx peer task was the
# exact task which cancelled, vs. some other
# task in the same actor.
elif canceller == ctx.chan.uid: elif canceller == ctx.chan.uid:
explain += f'its {ctx.peer_side!r}-side peer' msg += 'its caller'
else: else:
explain += 'a remote peer' msg += 'a remote peer'
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------' div_chars: str = '------ - ------'
div_offset: int = ( div_offset: int = (
round(len(explain)/2)+1 round(len(msg)/2)+1
+ +
round(len(div_chars)/2)+1 round(len(div_chars)/2)+1
) )
@ -634,12 +636,11 @@ async def _invoke(
+ +
f'{div_chars}\n' f'{div_chars}\n'
) )
explain += ( msg += (
div_str + div_str +
f'<= canceller: {canceller}\n' f'<= canceller: {canceller}\n'
f'=> cancellee: {our_uid}\n' f'=> uid: {our_uid}\n'
# TODO: better repr for ctx tasks.. f' |_{ctx._task}()'
f' |_{ctx.side!r} {ctx._task}'
# TODO: instead just show the # TODO: instead just show the
# ctx.__str__() here? # ctx.__str__() here?
@ -658,7 +659,7 @@ async def _invoke(
# task, so relay this cancel signal to the # task, so relay this cancel signal to the
# other side. # other side.
ctxc = ContextCancelled( ctxc = ContextCancelled(
message=explain, message=msg,
boxed_type=trio.Cancelled, boxed_type=trio.Cancelled,
canceller=canceller, canceller=canceller,
) )
@ -701,9 +702,11 @@ async def _invoke(
ctx: Context = actor._contexts.pop(( ctx: Context = actor._contexts.pop((
chan.uid, chan.uid,
cid, cid,
# ctx.side,
)) ))
merr: Exception|None = ctx.maybe_error merr: Exception|None = ctx.maybe_error
( (
res_type_str, res_type_str,
res_str, res_str,
@ -717,7 +720,7 @@ async def _invoke(
) )
log.runtime( log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}' f'{ctx}\n'
) )
@ -803,19 +806,13 @@ async def process_messages(
and `Actor.cancel()` process-wide-runtime-shutdown requests and `Actor.cancel()` process-wide-runtime-shutdown requests
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
assert actor._service_n # state sanity assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
# -[ ] existing GH https://github.com/python-trio/trio/issues/467 # https://github.com/python-trio/trio/issues/467
# -[ ] for other transports (like QUIC) we can possibly just
# entirely avoid the feeder mem-chans since each msg will be
# delivered with a ctx-id already?
#
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime( log.runtime(
'Entering RPC msg loop:\n' 'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
@ -853,7 +850,7 @@ async def process_messages(
| Return(cid=cid) | Return(cid=cid)
| CancelAck(cid=cid) | CancelAck(cid=cid)
# `.cid` indicates RPC-ctx-task scoped # `.cid` means RPC-ctx-task specific
| Error(cid=cid) | Error(cid=cid)
# recv-side `MsgType` decode violation # recv-side `MsgType` decode violation
@ -1049,16 +1046,16 @@ async def process_messages(
trio.Event(), trio.Event(),
) )
# runtime-scoped remote (internal) error # runtime-scoped remote error (since no `.cid`)
# (^- bc no `Error.cid` -^)
#
# NOTE: this is the non-rpc error case, that
# is, an error NOT raised inside a call to
# `_invoke()` (i.e. no cid was provided in the
# msg - see above). Raise error inline and
# mark the channel as "globally errored" for
# all downstream consuming primitives.
case Error(): case Error():
# NOTE: this is the non-rpc error case,
# that is, an error **not** raised inside
# a call to ``_invoke()`` (i.e. no cid was
# provided in the msg - see above). Push
# this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
# assert chan.uid
chan._exc: Exception = unpack_error( chan._exc: Exception = unpack_error(
msg, msg,
chan=chan, chan=chan,
@ -1114,7 +1111,7 @@ async def process_messages(
f'|_{chan.raddr}\n' f'|_{chan.raddr}\n'
) )
# transport **WAS** disconnected # transport **was** disconnected
return True return True
except ( except (
@ -1153,11 +1150,12 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
'Exiting IPC msg loop with final msg\n\n' 'Exiting IPC msg loop with\n'
f'<= peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'|_{chan}\n\n' f'|_{chan}\n\n'
f'{pformat(msg)}\n\n' 'final msg:\n'
f'{pformat(msg)}\n'
) )
# transport **WAS NOT** disconnected # transport **was not** disconnected
return False return False

View File

@ -850,7 +850,7 @@ class Actor:
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
ack_timeout: float = float('inf'), ack_timeout: float = 3,
) -> Context: ) -> Context:
''' '''

View File

@ -37,11 +37,6 @@ from ._codec import (
MsgDec as MsgDec, MsgDec as MsgDec,
current_codec as current_codec, current_codec as current_codec,
) )
# currently can't bc circular with `._context`
# from ._ops import (
# PldRx as PldRx,
# _drain_to_final_msg as _drain_to_final_msg,
# )
from .types import ( from .types import (
Msg as Msg, Msg as Msg,

View File

@ -75,7 +75,7 @@ log = get_logger(__name__)
# TODO: unify with `MsgCodec` by making `._dec` part this? # TODO: unify with `MsgCodec` by making `._dec` part this?
class MsgDec(Struct): class MsgDec(Struct):
''' '''
An IPC msg (payload) decoder. An IPC msg decoder.
Normally used to decode only a payload: `MsgType.pld: Normally used to decode only a payload: `MsgType.pld:
PayloadT` field before delivery to IPC consumer code. PayloadT` field before delivery to IPC consumer code.
@ -87,31 +87,6 @@ class MsgDec(Struct):
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._dec return self._dec
def __repr__(self) -> str:
speclines: str = self.spec_str
# in multi-typed spec case we stick the list
# all on newlines after the |__pld_spec__:,
# OW it's prolly single type spec-value
# so just leave it on same line.
if '\n' in speclines:
speclines: str = '\n' + textwrap.indent(
speclines,
prefix=' '*3,
)
body: str = textwrap.indent(
f'|_dec_hook: {self.dec.dec_hook}\n'
f'|__pld_spec__: {speclines}\n',
prefix=' '*2,
)
return (
f'<{type(self).__name__}(\n'
f'{body}'
')>'
)
# struct type unions # struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
# #
@ -162,7 +137,17 @@ class MsgDec(Struct):
# TODO: would get moved into `FieldSpec.__str__()` right? # TODO: would get moved into `FieldSpec.__str__()` right?
@property @property
def spec_str(self) -> str: def spec_str(self) -> str:
return pformat_msgspec(codec=self)
# TODO: could also use match: instead?
spec: Union[Type]|Type = self.spec
# `typing.Union` case
if getattr(spec, '__args__', False):
return str(spec)
# just a single type
else:
return spec.__name__
pld_spec_str = spec_str pld_spec_str = spec_str
@ -183,58 +168,10 @@ def mk_dec(
) -> MsgDec: ) -> MsgDec:
return MsgDec( return msgpack.Decoder(
_dec=msgpack.Decoder(
type=spec, # like `Msg[Any]` type=spec, # like `Msg[Any]`
dec_hook=dec_hook, dec_hook=dec_hook,
) )
)
def mk_msgspec_table(
dec: msgpack.Decoder,
msg: MsgType|None = None,
) -> dict[str, MsgType]|str:
'''
Fill out a `dict` of `MsgType`s keyed by name
for a given input `msgspec.msgpack.Decoder`
as defined by its `.type: Union[Type]` setting.
If `msg` is provided, only deliver a `dict` with a single
entry for that type.
'''
msgspec: Union[Type]|Type = dec.type
if not (msgtypes := getattr(msgspec, '__args__', False)):
msgtypes = [msgspec]
msgt_table: dict[str, MsgType] = {
msgt: str(msgt)
for msgt in msgtypes
}
if msg:
msgt: MsgType = type(msg)
str_repr: str = msgt_table[msgt]
return {msgt: str_repr}
return msgt_table
def pformat_msgspec(
codec: MsgCodec|MsgDec,
msg: MsgType|None = None,
join_char: str = '\n',
) -> str:
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join(
mk_msgspec_table(
dec=dec,
msg=msg,
).values()
)
# TODO: overall IPC msg-spec features (i.e. in this mod)! # TODO: overall IPC msg-spec features (i.e. in this mod)!
# #
@ -263,7 +200,7 @@ class MsgCodec(Struct):
def __repr__(self) -> str: def __repr__(self) -> str:
speclines: str = textwrap.indent( speclines: str = textwrap.indent(
pformat_msgspec(codec=self), self.pformat_msg_spec(),
prefix=' '*3, prefix=' '*3,
) )
body: str = textwrap.indent( body: str = textwrap.indent(
@ -307,11 +244,33 @@ class MsgCodec(Struct):
# NOTE: defined and applied inside `mk_codec()` # NOTE: defined and applied inside `mk_codec()`
return self._dec.type return self._dec.type
def msg_spec_items(
self,
msg: MsgType|None = None,
) -> dict[str, MsgType]|str:
msgt_table: dict[str, MsgType] = {
msgt: str(msgt)
for msgt in self.msg_spec.__args__
}
if msg:
msgt: MsgType = type(msg)
str_repr: str = msgt_table[msgt]
return {msgt: str_repr}
return msgt_table
# TODO: some way to make `pretty_struct.Struct` use this # TODO: some way to make `pretty_struct.Struct` use this
# wrapped field over the `.msg_spec` one? # wrapped field over the `.msg_spec` one?
@property def pformat_msg_spec(
def msg_spec_str(self) -> str: self,
return pformat_msgspec(self.msg_spec) msg: MsgType|None = None,
join_char: str = '\n',
) -> str:
return join_char.join(
self.msg_spec_items(msg=msg).values()
)
lib: ModuleType = msgspec lib: ModuleType = msgspec
@ -321,31 +280,16 @@ class MsgCodec(Struct):
def enc(self) -> msgpack.Encoder: def enc(self) -> msgpack.Encoder:
return self._enc return self._enc
# TODO: reusing encode buffer for perf?
# https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
_buf: bytearray = bytearray()
def encode( def encode(
self, self,
py_obj: Any, py_obj: Any,
use_buf: bool = False,
# ^-XXX-^ uhh why am i getting this?
# |_BufferError: Existing exports of data: object cannot be re-sized
) -> bytes: ) -> bytes:
''' '''
Encode input python objects to `msgpack` bytes for Encode input python objects to `msgpack` bytes for
transfer on a tranport protocol connection. transfer on a tranport protocol connection.
When `use_buf == True` use the output buffer optimization:
https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
''' '''
if use_buf:
self._enc.encode_into(py_obj, self._buf)
return self._buf
else:
return self._enc.encode(py_obj) return self._enc.encode(py_obj)
@property @property

View File

@ -146,11 +146,7 @@ class PldRx(Struct):
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ctx._rx_chan.receive_nowait() ctx._rx_chan.receive_nowait()
) )
return self.dec_msg( return self.dec_msg(msg)
msg,
ctx=ctx,
expect_msg=expect_msg,
)
async def recv_pld( async def recv_pld(
self, self,

View File

@ -102,59 +102,6 @@ def iter_fields(struct: Struct) -> Iterator[
) )
def pformat(
struct: Struct,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in iter_fields(struct):
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
class Struct( class Struct(
_Struct, _Struct,
@ -193,12 +140,65 @@ class Struct(
return sin_props return sin_props
pformat = pformat # TODO: make thisi a mod-func!
# __str__ = __repr__ = pformat def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in iter_fields(self):
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering # TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty? # inside a known tty?
# def __repr__(self) -> str: # def __repr__(self) -> str:
# ... # ...
# __str__ = __repr__ = pformat
__repr__ = pformat __repr__ = pformat
def copy( def copy(