Compare commits

...

11 Commits

Author SHA1 Message Date
Tyler Goodlet a5a0e6854b Use new `Msg[Co]Dec` repr meths in `._exceptions`
Particularly when logging around `MsgTypeError`s.

Other:
- make `_raise_from_unexpected_msg()`'s `expect_msg` a non-default value
  arg, must always be passed by caller.
- drop `'canceller'` from `_body_fields` ow it shows up twice for ctxc.
- use `.msg.pretty_struct.pformat()`.
- parameterize `RemoteActorError.reprol()` (repr-one-line method) to
  show `RemoteActorError[<self.boxed_type_str>]( ..` to make obvi
  the boxed remote error type.
- re-impl `.boxed_type_str` as `str`-casting the `.boxed_type` value
  which is guaranteed to render non-`None`.
2024-04-26 13:09:38 -04:00
Tyler Goodlet c383978402 Add more useful `MsgDec.__repr__()`
Basically exact same as that for `MsgCodec` with the `.spec` displayed
via a better (maybe multi-line) `.spec_str: str` generated from a common
new set of helper mod funcs factored out msg-codec meths:
- `mk_msgspec_table()` to gen a `MsgType` name -> msg table.
- `pformat_msgspec()` to `str`-ify said table values nicely.q

Also add a new `MsgCodec.msg_spec_str: str` prop which delegates to the
above for the same.
2024-04-26 12:49:37 -04:00
Tyler Goodlet 08fcd3fb03 Mk `.msg.pretty_struct.Struct.pformat()` a mod func
More along the lines of `msgspec.struct` and also far more useful
internally for pprinting `MsgTypes`. Of course add method aliases.
2024-04-25 20:00:13 -04:00
Tyler Goodlet adba454d1d Use `Context.[peer_]side` in ctxc messages 2024-04-25 16:19:39 -04:00
Tyler Goodlet 4bab998ff9 Add `Context.peer_side: str` property, mk static-meth private. 2024-04-25 12:38:05 -04:00
Tyler Goodlet c25c77c573 Flip back `StartAck` timeout to `inf`.. 2024-04-25 12:36:14 -04:00
Tyler Goodlet 188ff0e0e5 Another `._rpc` mod passthrough
- tweaking logging to include more `MsgType` dumps on IPC faults.
- removing some commented cruft.
- comment formatting / cleanups / add-ons.
- more type annots.
- fill out some TODO content.
2024-04-25 12:33:10 -04:00
Tyler Goodlet 6b30c86eca Try out `msgspec` encode-buffer optimization
As per the reco:
https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffe

BUT, seems to cause this error in `pikerd`..

`BufferError: Existing exports of data: object cannot be re-sized`

Soo no idea? Maybe there's a tweak needed that we can glean from
tests/examples in the `msgspec` repo?

Disabling for now.
2024-04-24 13:07:05 -04:00
Tyler Goodlet 6aa52417ef Set `Context._stream` in `Portal.open_stream_from()`.. 2024-04-24 12:43:08 -04:00
Tyler Goodlet 18e97a8f9a Use `Context._stream` in `_raise_from_unexpected_msg()`
Instead of expecting it to be passed in (as it was prior), when
determining if a `Stop` msg is a valid end-of-channel signal use the
`ctx._stream: MsgStream|None` attr which **must** be set by any stream
opening API; either of:
- `Context.open_stream()`
- `Portal.open_stream_from()`

Adjust the case block logic to match with fallthrough from any EoC to
a closed error if necessary. Change the `_type: str` to match the
failing IPC-prim name in the tail case we raise a `MessagingError`.

Other:
- move `.sender: tuple` uid attr up to `RemoteActorError` since `Error`
  optionally defines it as a field and for boxed `StreamOverrun`s (an
  ignore case we check for in the runtime during cancellation) we want
  it readable from the boxing rae.
- drop still unused `InternalActorError`.
2024-04-24 12:42:05 -04:00
Tyler Goodlet 5eb9144921 First draft "payload receiver in a new `.msg._ops`
As per much tinkering, re-designs and preceding rubber-ducking via many
"commit msg novelas", **finally** this adds the (hopefully) final
missing layer for typed msg safety: `tractor.msg._ops.PldRx`

(or `PayloadReceiver`? haven't decided how verbose to go..)

Design justification summary:
      ------ - ------
- need a way to be as-close-as-possible to the `tractor`-application
  such that when `MsgType.pld: PayloadT` validation takes place, it is
  straightforward and obvious how user code can decide to handle any
  resulting `MsgTypeError`.
- there should be a common and optional-yet-modular way to modify
  **how** data delivered via IPC (possibly embedded as user defined,
  type-constrained `.pld: msgspec.Struct`s) can be handled and processed
  during fault conditions and/or IPC "msg attacks".
- support for nested type constraints within a `MsgType.pld` field
  should be simple to define, implement and understand at runtime.
- a layer between the app-level IPC primitive APIs
  (`Context`/`MsgStream`) and application-task code (consumer code of
  those APIs) should be easily customized and prove-to-be-as-such
  through demonstrably rigorous internal (sub-sys) use!
  -> eg. via seemless runtime RPC eps support like `Actor.cancel()`
  -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt
    dialog prot, via a dead simple payload-as-ctl-msg-spec.

There are some fairly detailed doc strings included so I won't duplicate
that content, the majority of the work here is actually somewhat of
a factoring of many similar blocks that are doing more or less the same
`msg = await Context._rx_chan.receive()` with boilerplate for
`Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new
`PldRx` basically provides a shim layer for this common "receive msg,
decode its payload, yield it up to the consuming app task" by pairing
the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API
internals to use **one** API instead of re-implementing the same pattern
all over the place XD

`PldRx` breakdown
 ------ - ------
- for now only expects a `._msgdec: MsgDec` which allows for
  override-able `MsgType.pld` validation and most obviously used in
  the impl of `.dec_msg()`, the decode message method.
- provides multiple mem-chan receive options including:
 |_ `.recv_pld()` which does the e2e operation of receiving a payload
    item.
 |_ a sync `.recv_pld_nowait()` version.
 |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the
    shuttling `MsgType` as well as it's `.pld` body for use cases where
    info on both is important (eg. draining a `MsgStream`).

Dirty internal changeover/implementation deatz:
             ------ - ------
- obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield
  logic:
 - `MsgStream.receive[_nowait]()` delegating instead to the equivalent
   `PldRx.recv_pld[_nowait]()`.
 - add `Context._pld_rx: PldRx`, created and passed in by
   `mk_context()`; use it for the `.started()` -> `first: Started`
   retrieval inside `open_context_from_portal()`.
 - all the relevant `Portal` invocation methods: `.result()`,
   `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()`
   and `.Portal_return_once()` outright Bo
- rename `Context.ctx._recv_chan` -> `._rx_chan`.
- add detailed `Context._scope` info for logging whether or not it's
  cancelled inside `_maybe_cancel_and_set_remote_error()`.
- move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()`
  since it's really not necessarily ctx specific per say, and it does
  kinda fit with "msg operations" more abstractly ;)
2024-04-24 00:59:41 -04:00
10 changed files with 1081 additions and 746 deletions

View File

@ -25,26 +25,31 @@ disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations from __future__ import annotations
from collections import deque from collections import deque
from contextlib import asynccontextmanager as acm from contextlib import (
from contextvars import ContextVar asynccontextmanager as acm,
)
from dataclasses import ( from dataclasses import (
dataclass, dataclass,
field, field,
) )
from functools import partial from functools import partial
import inspect import inspect
import msgspec
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
Type,
TYPE_CHECKING, TYPE_CHECKING,
Union,
) )
import warnings import warnings
# ------ - ------
import trio import trio
from msgspec import (
ValidationError,
)
# ------ - ------
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
InternalError, InternalError,
@ -53,7 +58,6 @@ from ._exceptions import (
StreamOverrun, StreamOverrun,
pack_from_raise, pack_from_raise,
unpack_error, unpack_error,
_raise_from_no_key_in_msg,
) )
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
@ -70,8 +74,12 @@ from .msg import (
current_codec, current_codec,
pretty_struct, pretty_struct,
types as msgtypes, types as msgtypes,
_ops as msgops,
)
from ._ipc import (
Channel,
_mk_msg_type_err,
) )
from ._ipc import Channel
from ._streaming import MsgStream from ._streaming import MsgStream
from ._state import ( from ._state import (
current_actor, current_actor,
@ -86,294 +94,9 @@ if TYPE_CHECKING:
CallerInfo, CallerInfo,
) )
log = get_logger(__name__) log = get_logger(__name__)
async def _drain_to_final_msg(
ctx: Context,
hide_tb: bool = True,
msg_limit: int = 6,
) -> tuple[
Return|None,
list[MsgType]
]:
'''
Drain IPC msgs delivered to the underlying rx-mem-chan
`Context._recv_chan` from the runtime in search for a final
result or error msg.
The motivation here is to ideally capture errors during ctxc
conditions where a canc-request/or local error is sent but the
local task also excepts and enters the
`Portal.open_context().__aexit__()` block wherein we prefer to
capture and raise any remote error or ctxc-ack as part of the
`ctx.result()` cleanup and teardown sequence.
'''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[MsgType] = []
return_msg: Return|None = None
while not (
ctx.maybe_error
and not ctx._final_result_is_set()
):
try:
# TODO: can remove?
# await trio.lowlevel.checkpoint()
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# TODO: bad idea?
# -[ ] wrap final outcome channel wait in a scope so
# it can be cancelled out of band if needed?
#
# with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._recv_chan.receive()
# if res_cs.cancelled_caught:
# TODO: ensure there's no more hangs, debugging the
# runtime pretty preaase!
# from .devx._debug import pause
# await pause()
# TODO: can remove this finally?
# we have no more need for the sync draining right
# since we're can kinda guarantee the async
# `.receive()` below will never block yah?
#
# if (
# ctx._cancel_called and (
# ctx.cancel_acked
# # or ctx.chan._cancel_called
# )
# # or not ctx._final_result_is_set()
# # ctx.outcome is not
# # or ctx.chan._closed
# ):
# try:
# msg: dict = await ctx._recv_chan.receive_nowait()()
# except trio.WouldBlock:
# log.warning(
# 'When draining already `.cancel_called` ctx!\n'
# 'No final msg arrived..\n'
# )
# break
# else:
# msg: dict = await ctx._recv_chan.receive()
# TODO: don't need it right jefe?
# with trio.move_on_after(1) as cs:
# if cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
# pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._recv_chan.receive()
msg: MsgType = await ctx._recv_chan.receive()
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
ctx.maybe_raise()
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
match msg:
# final result arrived!
case Return(
# cid=cid,
pld=res,
):
ctx._result: Any = res
log.runtime(
'Context delivered final draining msg:\n'
f'{pformat(msg)}'
)
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._recv_chan:
# await ctx._recv_chan.aclose()
# TODO: ^ we don't need it right?
return_msg = msg
break
# far end task is still streaming to us so discard
# and report depending on local ctx state.
case Yield():
pre_result_drained.append(msg)
if (
(ctx._stream.closed
and (reason := 'stream was already closed')
)
or (ctx.cancel_acked
and (reason := 'ctx cancelled other side')
)
or (ctx._cancel_called
and (reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and (reason := f'"yield" limit={msg_limit}')
)
):
log.cancel(
'Cancelling `MsgStream` drain since '
f'{reason}\n\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
)
return (
return_msg,
pre_result_drained,
)
# drain up to the `msg_limit` hoping to get
# a final result or error/ctxc.
else:
log.warning(
'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
)
continue
# stream terminated, but no result yet..
#
# TODO: work out edge cases here where
# a stream is open but the task also calls
# this?
# -[ ] should be a runtime error if a stream is open right?
# Stop()
case Stop():
pre_result_drained.append(msg)
log.cancel(
'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n'
)
continue
# remote error msg, likely already handled inside
# `Context._deliver_msg()`
case Error():
# TODO: can we replace this with `ctx.maybe_raise()`?
# -[ ] would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
#
re: Exception|None = ctx._remote_error
if re:
assert msg is ctx._cancel_msg
# NOTE: this solved a super duper edge case XD
# this was THE super duper edge case of:
# - local task opens a remote task,
# - requests remote cancellation of far end
# ctx/tasks,
# - needs to wait for the cancel ack msg
# (ctxc) or some result in the race case
# where the other side's task returns
# before the cancel request msg is ever
# rxed and processed,
# - here this surrounding drain loop (which
# iterates all ipc msgs until the ack or
# an early result arrives) was NOT exiting
# since we are the edge case: local task
# does not re-raise any ctxc it receives
# IFF **it** was the cancellation
# requester..
#
# XXX will raise if necessary but ow break
# from loop presuming any supressed error
# (ctxc) should terminate the context!
ctx._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
elif error := unpack_error(
msg=msg,
chan=ctx._portal.channel,
hide_tb=False,
):
log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata
assert error.ipc_msg == ctx._remote_error.ipc_msg
from .devx._debug import pause
await pause()
ctx._maybe_cancel_and_set_remote_error(error)
ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise
# XXX should pretty much never get here unless someone
# overrides the default `MsgType` spec.
case _:
pre_result_drained.append(msg)
# It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here!
if not msg.cid:
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
raise RuntimeError('Unknown msg type: {msg}')
else:
log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n'
f'{ctx.outcome}\n'
)
return (
return_msg,
pre_result_drained,
)
class Unresolved: class Unresolved:
''' '''
Placeholder value for `Context._result` until Placeholder value for `Context._result` until
@ -423,9 +146,12 @@ class Context:
# the "feeder" channels for delivering message values to the # the "feeder" channels for delivering message values to the
# local task from the runtime's msg processing loop. # local task from the runtime's msg processing loop.
_recv_chan: trio.MemoryReceiveChannel _rx_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel _send_chan: trio.MemorySendChannel
# payload receiver
_pld_rx: msgops.PldRx
# full "namespace-path" to target RPC function # full "namespace-path" to target RPC function
_nsf: NamespacePath _nsf: NamespacePath
@ -447,7 +173,7 @@ class Context:
_task: trio.lowlevel.Task|None = None _task: trio.lowlevel.Task|None = None
# TODO: cs around result waiting so we can cancel any # TODO: cs around result waiting so we can cancel any
# permanently blocking `._recv_chan.receive()` call in # permanently blocking `._rx_chan.receive()` call in
# a drain loop? # a drain loop?
# _res_scope: trio.CancelScope|None = None # _res_scope: trio.CancelScope|None = None
@ -504,14 +230,6 @@ class Context:
_started_called: bool = False _started_called: bool = False
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
'pld_codec',
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
)
@property
def pld_codec(self) -> MsgCodec|None:
return self._pld_codec_var.get()
# caller of `Portal.open_context()` for # caller of `Portal.open_context()` for
# logging purposes mostly # logging purposes mostly
@ -754,13 +472,17 @@ class Context:
return 'parent' if self._portal else 'child' return 'parent' if self._portal else 'child'
@staticmethod @staticmethod
def peer_side(side: str) -> str: def _peer_side(side: str) -> str:
match side: match side:
case 'child': case 'child':
return 'parent' return 'parent'
case 'parent': case 'parent':
return 'child' return 'child'
@property
def peer_side(self) -> str:
return self._peer_side(self.side)
# TODO: remove stat! # TODO: remove stat!
# -[ ] re-implement the `.experiemental._pubsub` stuff # -[ ] re-implement the `.experiemental._pubsub` stuff
# with `MsgStream` and that should be last usage? # with `MsgStream` and that should be last usage?
@ -794,9 +516,7 @@ class Context:
equiv of a `StopIteration`. equiv of a `StopIteration`.
''' '''
await self.chan.send( await self.chan.send(Stop(cid=self.cid))
Stop(cid=self.cid)
)
def _maybe_cancel_and_set_remote_error( def _maybe_cancel_and_set_remote_error(
self, self,
@ -875,7 +595,6 @@ class Context:
# TODO: never do this right? # TODO: never do this right?
# if self._remote_error: # if self._remote_error:
# return # return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that # XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this # after we cancel whatever task is the opener of this
@ -883,7 +602,7 @@ class Context:
# appropriately. # appropriately.
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= {peer_side!r}: {self.chan.uid}\n' f'<= {self.peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n' f'=> {self.side!r}\n\n'
f'{error}' f'{error}'
) )
@ -905,9 +624,8 @@ class Context:
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgerr = True msgerr = True
peer_side: str = self.peer_side(self.side)
log.error( log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
f'{error}\n' f'{error}\n'
f'{pformat(self)}\n' f'{pformat(self)}\n'
@ -916,9 +634,8 @@ class Context:
else: else:
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
# f'{pformat(self)}\n'
f'{error}\n' f'{error}\n'
f'{pformat(self)}\n'
) )
# always record the cancelling actor's uid since its # always record the cancelling actor's uid since its
@ -955,24 +672,49 @@ class Context:
and not self._is_self_cancelled() and not self._is_self_cancelled()
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
and (
msgerr
and
# NOTE: allow user to config not cancelling the
# local scope on `MsgTypeError`s
self._cancel_on_msgerr
)
): ):
# TODO: it'd sure be handy to inject our own if not (
# `trio.Cancelled` subtype here ;) msgerr
# https://github.com/goodboy/tractor/issues/368
log.cancel('Cancelling local `.open_context()` scope!')
self._scope.cancel()
# NOTE: we allow user to config not cancelling the
# local scope on `MsgTypeError`s
and not self._cancel_on_msgerr
):
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
self._scope.cancel()
else:
message: str = (
'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n'
f'{error}\n'
)
else: else:
log.cancel('NOT cancelling local `.open_context()` scope!') message: str = 'NOT cancelling `Context._scope` !\n\n'
scope_info: str = 'No `self._scope: CancelScope` was set/used ?'
if cs:
scope_info: str = (
f'self._scope: {cs}\n'
f'|_ .cancel_called: {cs.cancel_called}\n'
f'|_ .cancelled_caught: {cs.cancelled_caught}\n'
f'|_ ._cancel_status: {cs._cancel_status}\n\n'
f'{self}\n'
f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n'
f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n'
f'msgerr: {msgerr}\n'
)
log.cancel(
message
+
f'{scope_info}'
)
# TODO: maybe we should also call `._res_scope.cancel()` if it # TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs? # exists to support cancelling any drain loop hangs?
@ -1256,7 +998,7 @@ class Context:
# a ``.open_stream()`` block prior or there was some other # a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``. # unanticipated error or cancellation from ``trio``.
if ctx._recv_chan._closed: if ctx._rx_chan._closed:
raise trio.ClosedResourceError( raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!\n' 'The underlying channel for this stream was already closed!\n'
) )
@ -1276,7 +1018,7 @@ class Context:
# stream WAS NOT just closed normally/gracefully. # stream WAS NOT just closed normally/gracefully.
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=ctx._recv_chan, rx_chan=ctx._rx_chan,
) as stream: ) as stream:
# NOTE: we track all existing streams per portal for # NOTE: we track all existing streams per portal for
@ -1325,12 +1067,12 @@ class Context:
except trio.EndOfChannel as eoc: except trio.EndOfChannel as eoc:
if ( if (
eoc eoc
and stream.closed and
stream.closed
): ):
# sanity, can remove? # sanity, can remove?
assert eoc is stream._eoc assert eoc is stream._eoc
# from .devx import pause
# await pause()
log.warning( log.warning(
'Stream was terminated by EoC\n\n' 'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but # NOTE: won't show the error <Type> but
@ -1427,13 +1169,12 @@ class Context:
# boxed `StreamOverrun`. This is mostly useful for # boxed `StreamOverrun`. This is mostly useful for
# supressing such faults during # supressing such faults during
# cancellation/error/final-result handling inside # cancellation/error/final-result handling inside
# `_drain_to_final_msg()` such that we do not # `msg._ops.drain_to_final_msg()` such that we do not
# raise such errors particularly in the case where # raise such errors particularly in the case where
# `._cancel_called == True`. # `._cancel_called == True`.
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun
and remote_error.boxed_type_str == 'StreamOverrun'
# and tuple(remote_error.msgdata['sender']) == our_uid # and tuple(remote_error.msgdata['sender']) == our_uid
and tuple(remote_error.sender) == our_uid and tuple(remote_error.sender) == our_uid
@ -1503,12 +1244,12 @@ class Context:
if self._final_result_is_set(): if self._final_result_is_set():
return self._result return self._result
assert self._recv_chan assert self._rx_chan
raise_overrun: bool = not self._allow_overruns raise_overrun: bool = not self._allow_overruns
if ( if (
self.maybe_error is None self.maybe_error is None
and and
not self._recv_chan._closed # type: ignore not self._rx_chan._closed # type: ignore
): ):
# wait for a final context result/error by "draining" # wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield" # (by more or less ignoring) any bi-dir-stream "yield"
@ -1516,7 +1257,7 @@ class Context:
( (
return_msg, return_msg,
drained_msgs, drained_msgs,
) = await _drain_to_final_msg( ) = await msgops.drain_to_final_msg(
ctx=self, ctx=self,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
@ -1802,8 +1543,7 @@ class Context:
await self.chan.send(started_msg) await self.chan.send(started_msg)
# raise any msg type error NO MATTER WHAT! # raise any msg type error NO MATTER WHAT!
except msgspec.ValidationError as verr: except ValidationError as verr:
from tractor._ipc import _mk_msg_type_err
raise _mk_msg_type_err( raise _mk_msg_type_err(
msg=msg_bytes, msg=msg_bytes,
codec=codec, codec=codec,
@ -1890,7 +1630,7 @@ class Context:
- NEVER `return` early before delivering the msg! - NEVER `return` early before delivering the msg!
bc if the error is a ctxc and there is a task waiting on bc if the error is a ctxc and there is a task waiting on
`.result()` we need the msg to be `.result()` we need the msg to be
`send_chan.send_nowait()`-ed over the `._recv_chan` so `send_chan.send_nowait()`-ed over the `._rx_chan` so
that the error is relayed to that waiter task and thus that the error is relayed to that waiter task and thus
raised in user code! raised in user code!
@ -1904,10 +1644,9 @@ class Context:
side: str = self.side side: str = self.side
if side == 'child': if side == 'child':
assert not self._portal assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = ( flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n' f'<= peer {self.peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n' f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n' f'=> {side!r}: {self._task}\n'
@ -1925,7 +1664,7 @@ class Context:
log_meth = log.runtime log_meth = log.runtime
log_meth( log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
f'{flow_body}' f'{flow_body}'
@ -2201,24 +1940,11 @@ async def open_context_from_portal(
# -> it's expected that if there is an error in this phase of # -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
msg: Started = await ctx._recv_chan.receive() first: Any = await ctx._pld_rx.recv_pld(
try: ctx=ctx,
# the "first" value here is delivered by the callee's expect_msg=Started,
# ``Context.started()`` call. )
# first: Any = msg['started'] ctx._started_called: bool = True
first: Any = msg.pld
ctx._started_called: bool = True
# except KeyError as src_error:
except AttributeError as src_error:
log.exception('Raising from unexpected msg!\n')
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_msg=Started,
)
uid: tuple = portal.channel.uid uid: tuple = portal.channel.uid
cid: str = ctx.cid cid: str = ctx.cid
@ -2540,7 +2266,7 @@ async def open_context_from_portal(
# we tear down the runtime feeder chan last # we tear down the runtime feeder chan last
# to avoid premature stream clobbers. # to avoid premature stream clobbers.
if ( if (
(rxchan := ctx._recv_chan) (rxchan := ctx._rx_chan)
# maybe TODO: yes i know the below check is # maybe TODO: yes i know the below check is
# touching `trio` memchan internals..BUT, there are # touching `trio` memchan internals..BUT, there are
@ -2583,7 +2309,7 @@ async def open_context_from_portal(
# underlying feeder channel is # underlying feeder channel is
# once-and-only-CLOSED! # once-and-only-CLOSED!
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose() await ctx._rx_chan.aclose()
# XXX: we always raise remote errors locally and # XXX: we always raise remote errors locally and
# generally speaking mask runtime-machinery related # generally speaking mask runtime-machinery related
@ -2603,7 +2329,7 @@ async def open_context_from_portal(
and ctx.cancel_acked and ctx.cancel_acked
): ):
log.cancel( log.cancel(
'Context cancelled by caller task\n' 'Context cancelled by {ctx.side!r}-side task\n'
f'|_{ctx._task}\n\n' f'|_{ctx._task}\n\n'
f'{repr(scope_err)}\n' f'{repr(scope_err)}\n'
@ -2628,21 +2354,23 @@ async def open_context_from_portal(
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime( log.runtime(
'Removing IPC ctx opened with peer\n' 'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
f'{uid}\n' f'uid: {uid}\n'
f'|_{ctx}\n' f'cid: {ctx.cid}\n'
) )
portal.actor._contexts.pop( portal.actor._contexts.pop(
(uid, cid), (uid, cid),
None, None,
) )
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,
nsf: NamespacePath, nsf: NamespacePath,
msg_buffer_size: int = 2**6, msg_buffer_size: int = 2**6,
pld_spec: Union[Type] = Any,
**kwargs, **kwargs,
@ -2662,12 +2390,18 @@ def mk_context(
from .devx._code import find_caller_info from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info() caller_info: CallerInfo|None = find_caller_info()
pld_rx = msgops.PldRx(
# _rx_mc=recv_chan,
_msgdec=_codec.mk_dec(spec=pld_spec)
)
ctx = Context( ctx = Context(
chan=chan, chan=chan,
cid=cid, cid=cid,
_actor=current_actor(), _actor=current_actor(),
_send_chan=send_chan, _send_chan=send_chan,
_recv_chan=recv_chan, _rx_chan=recv_chan,
_pld_rx=pld_rx,
_nsf=nsf, _nsf=nsf,
_task=trio.lowlevel.current_task(), _task=trio.lowlevel.current_task(),
_caller_info=caller_info, _caller_info=caller_info,

View File

@ -54,6 +54,7 @@ from tractor.msg import (
from tractor.msg.pretty_struct import ( from tractor.msg.pretty_struct import (
iter_fields, iter_fields,
Struct, Struct,
pformat as struct_format,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -108,6 +109,10 @@ _body_fields: list[str] = list(
'relay_path', 'relay_path',
'_msg_dict', '_msg_dict',
'cid', 'cid',
# since only ctxc should show it but `Error` does
# have it as an optional field.
'canceller',
} }
) )
@ -382,6 +387,9 @@ class RemoteActorError(Exception):
''' '''
Error type raised by original remote faulting actor. Error type raised by original remote faulting actor.
When the error has only been relayed a single actor-hop
this will be the same as the `.boxed_type`.
''' '''
if self._src_type is None: if self._src_type is None:
self._src_type = get_err_type( self._src_type = get_err_type(
@ -396,7 +404,8 @@ class RemoteActorError(Exception):
String-name of the (last hop's) boxed error type. String-name of the (last hop's) boxed error type.
''' '''
return self._ipc_msg.boxed_type_str bt: Type[BaseException] = self.boxed_type
return str(bt.__name__)
@property @property
def boxed_type(self) -> str: def boxed_type(self) -> str:
@ -492,7 +501,11 @@ class RemoteActorError(Exception):
''' '''
# TODO: use this matryoshka emjoi XD # TODO: use this matryoshka emjoi XD
# => 🪆 # => 🪆
reprol_str: str = f'{type(self).__name__}(' reprol_str: str = (
f'{type(self).__name__}' # type name
f'[{self.boxed_type_str}]' # parameterized by boxed type
'(' # init-style look
)
_repr: str = self._mk_fields_str( _repr: str = self._mk_fields_str(
self.reprol_fields, self.reprol_fields,
end_char=' ', end_char=' ',
@ -532,7 +545,8 @@ class RemoteActorError(Exception):
self, self,
) -> BaseException: ) -> BaseException:
''' '''
Unpack the inner-most source error from it's original IPC msg data. Unpack the inner-most source error from it's original IPC
msg data.
We attempt to reconstruct (as best as we can) the original We attempt to reconstruct (as best as we can) the original
`Exception` from as it would have been raised in the `Exception` from as it would have been raised in the
@ -570,6 +584,14 @@ class RemoteActorError(Exception):
# # boxed_type=get_type_ref(.. # # boxed_type=get_type_ref(..
# raise NotImplementedError # raise NotImplementedError
@property
def sender(self) -> tuple[str, str]|None:
if (
(msg := self._ipc_msg)
and (value := msg.sender)
):
return tuple(value)
class ContextCancelled(RemoteActorError): class ContextCancelled(RemoteActorError):
''' '''
@ -644,8 +666,8 @@ class MsgTypeError(
- `Yield` - `Yield`
- TODO: any embedded `.pld` type defined by user code? - TODO: any embedded `.pld` type defined by user code?
Normally the source of an error is re-raised from some `.msg._codec` Normally the source of an error is re-raised from some
decode which itself raises in a backend interchange `.msg._codec` decode which itself raises in a backend interchange
lib (eg. a `msgspec.ValidationError`). lib (eg. a `msgspec.ValidationError`).
''' '''
@ -734,20 +756,6 @@ class StreamOverrun(
handled by app code using `MsgStream.send()/.receive()`. handled by app code using `MsgStream.send()/.receive()`.
''' '''
@property
def sender(self) -> tuple[str, str] | None:
value = self._ipc_msg.sender
if value:
return tuple(value)
# class InternalActorError(RemoteActorError):
# '''
# Boxed (Remote) internal `tractor` error indicating failure of some
# primitive, machinery state or lowlevel task that should never
# occur.
# '''
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.ClosedResourceError):
@ -944,8 +952,7 @@ def _raise_from_unexpected_msg(
src_err: AttributeError, src_err: AttributeError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj
expect_msg: str = Yield, expect_msg: Type[MsgType],
stream: MsgStream | None = None,
# allow "deeper" tbs when debugging B^o # allow "deeper" tbs when debugging B^o
hide_tb: bool = True, hide_tb: bool = True,
@ -987,6 +994,8 @@ def _raise_from_unexpected_msg(
) from src_err ) from src_err
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
stream: MsgStream|None
_type: str = 'Context'
# raise the error message in a boxed exception type! # raise the error message in a boxed exception type!
if isinstance(msg, Error): if isinstance(msg, Error):
@ -1003,59 +1012,54 @@ def _raise_from_unexpected_msg(
# TODO: does it make more sense to pack # TODO: does it make more sense to pack
# the stream._eoc outside this in the calleer always? # the stream._eoc outside this in the calleer always?
# case Stop(): # case Stop():
elif ( elif stream := ctx._stream:
isinstance(msg, Stop) _type: str = 'MsgStream'
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# TODO: if the a local task is already blocking on if (
# a `Context.result()` and thus a `.receive()` on the stream._eoc
# rx-chan, we close the chan and set state ensuring that or
# an eoc is raised! isinstance(msg, Stop)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: this causes ``ReceiveChannel.__anext__()`` to # TODO: if the a local task is already blocking on
# raise a ``StopAsyncIteration`` **and** in our catch # a `Context.result()` and thus a `.receive()` on the
# block below it will trigger ``.aclose()``. # rx-chan, we close the chan and set state ensuring that
eoc = trio.EndOfChannel( # an eoc is raised!
f'Context stream ended due to msg:\n\n'
f'{pformat(msg)}\n'
)
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
# in case there already is some underlying remote error # XXX: this causes ``ReceiveChannel.__anext__()`` to
# that arrived which is probably the source of this stream # raise a ``StopAsyncIteration`` **and** in our catch
# closure # block below it will trigger ``.aclose()``.
ctx.maybe_raise() eoc = trio.EndOfChannel(
raise eoc from src_err f'Context stream ended due to msg:\n\n'
f'{pformat(msg)}\n'
)
# XXX: important to set so that a new `.receive()`
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the `return` message
# value out of the underlying feed mem chan which is
# destined for the `Context.result()` call during ctx-exit!
stream._eoc: Exception = eoc
if ( # in case there already is some underlying remote error
stream # that arrived which is probably the source of this stream
and stream._closed # closure
): ctx.maybe_raise()
# TODO: our own error subtype? raise eoc from src_err
raise trio.ClosedResourceError(
'This stream was closed' # TODO: our own transport/IPC-broke error subtype?
) if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case # always re-raise the source error if no translation error case
# is activated above. # is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError( raise MessagingError(
f"{_type} was expecting a {expect_msg} message" f'{_type} was expecting a {expect_msg.__name__!r} message'
" BUT received a non-error msg:\n" ' BUT received a non-error msg:\n\n'
f'{pformat(msg)}' f'{struct_format(msg)}'
) from src_err ) from src_err
@ -1088,13 +1092,11 @@ def _mk_msg_type_err(
# no src error from `msgspec.msgpack.Decoder.decode()` so # no src error from `msgspec.msgpack.Decoder.decode()` so
# prolly a manual type-check on our part. # prolly a manual type-check on our part.
if message is None: if message is None:
fmt_spec: str = codec.pformat_msg_spec()
fmt_stack: str = ( fmt_stack: str = (
'\n'.join(traceback.format_stack(limit=3)) '\n'.join(traceback.format_stack(limit=3))
) )
tb_fmt: str = pformat_boxed_tb( tb_fmt: str = pformat_boxed_tb(
tb_str=fmt_stack, tb_str=fmt_stack,
# fields_str=header,
field_prefix=' ', field_prefix=' ',
indent='', indent='',
) )
@ -1102,8 +1104,7 @@ def _mk_msg_type_err(
f'invalid msg -> {msg}: {type(msg)}\n\n' f'invalid msg -> {msg}: {type(msg)}\n\n'
f'{tb_fmt}\n' f'{tb_fmt}\n'
f'Valid IPC msgs are:\n\n' f'Valid IPC msgs are:\n\n'
# f' ------ - ------\n' f'{codec.msg_spec_str}\n',
f'{fmt_spec}\n',
) )
elif src_type_error: elif src_type_error:
src_message: str = str(src_type_error) src_message: str = str(src_type_error)

View File

@ -31,7 +31,7 @@ from typing import (
Any, Any,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
# Type, TYPE_CHECKING,
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
@ -46,12 +46,12 @@ from ._state import (
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
Error, # Error,
NamespacePath, NamespacePath,
Return, Return,
) )
from ._exceptions import ( from ._exceptions import (
unpack_error, # unpack_error,
NoResult, NoResult,
) )
from ._context import ( from ._context import (
@ -62,42 +62,44 @@ from ._streaming import (
MsgStream, MsgStream,
) )
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__) log = get_logger(__name__)
# TODO: rename to `unwrap_result()` and use # TODO: remove and/or rework?
# `._raise_from_no_key_in_msg()` (after tweak to # -[ ] rename to `unwrap_result()` and use
# accept a `chan: Channel` arg) in key block! # `._raise_from_unexpected_msg()` (after tweak to accept a `chan:
def _unwrap_msg( # Channel` arg) in key block??
msg: Return|Error, # -[ ] pretty sure this is entirely covered by
channel: Channel, # `_exceptions._raise_from_unexpected_msg()` so REMOVE!
# def _unwrap_msg(
# msg: Return|Error,
# ctx: Context,
hide_tb: bool = True, # hide_tb: bool = True,
) -> Any: # ) -> Any:
''' # '''
Unwrap a final result from a `{return: <Any>}` IPC msg. # Unwrap a final result from a `{return: <Any>}` IPC msg.
''' # '''
__tracebackhide__: bool = hide_tb # __tracebackhide__: bool = hide_tb
# try:
# return msg.pld
# except AttributeError as err:
try: # # internal error should never get here
return msg.pld # # assert msg.get('cid'), (
# return msg['return'] # assert msg.cid, (
# except KeyError as ke: # "Received internal error at portal?"
except AttributeError as err: # )
# internal error should never get here # raise unpack_error(
# assert msg.get('cid'), ( # msg,
assert msg.cid, ( # ctx.chan,
"Received internal error at portal?" # ) from err
)
raise unpack_error(
msg,
channel
) from err
class Portal: class Portal:
@ -123,17 +125,21 @@ class Portal:
# connected (peer) actors. # connected (peer) actors.
cancel_timeout: float = 0.5 cancel_timeout: float = 0.5
def __init__(self, channel: Channel) -> None: def __init__(
self,
channel: Channel,
) -> None:
self.chan = channel self.chan = channel
# during the portal's lifetime # during the portal's lifetime
self._result_msg: dict|None = None self._final_result: Any|None = None
# When set to a ``Context`` (when _submit_for_result is called) # When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some # it is expected that ``result()`` will be awaited at some
# point. # point.
self._expect_result: Context | None = None self._expect_result_ctx: Context|None = None
self._streams: set[MsgStream] = set() self._streams: set[MsgStream] = set()
self.actor = current_actor() self.actor: Actor = current_actor()
@property @property
def channel(self) -> Channel: def channel(self) -> Channel:
@ -147,6 +153,7 @@ class Portal:
) )
return self.chan return self.chan
# TODO: factor this out into an `ActorNursery` wrapper
async def _submit_for_result( async def _submit_for_result(
self, self,
ns: str, ns: str,
@ -154,27 +161,18 @@ class Portal:
**kwargs **kwargs
) -> None: ) -> None:
assert self._expect_result is None, ( if self._expect_result_ctx is not None:
"A pending main result has already been submitted" raise RuntimeError(
) 'A pending main result has already been submitted'
)
self._expect_result = await self.actor.start_remote_task( self._expect_result_ctx = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath(f'{ns}:{func}'), nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs, kwargs=kwargs,
portal=self, portal=self,
) )
async def _return_once(
self,
ctx: Context,
) -> Return:
assert ctx._remote_func_type == 'asyncfunc' # single response
msg: Return = await ctx._recv_chan.receive()
return msg
async def result(self) -> Any: async def result(self) -> Any:
''' '''
Return the result(s) from the remote actor's "main" task. Return the result(s) from the remote actor's "main" task.
@ -188,7 +186,7 @@ class Portal:
raise exc raise exc
# not expecting a "main" result # not expecting a "main" result
if self._expect_result is None: if self._expect_result_ctx is None:
log.warning( log.warning(
f"Portal for {self.channel.uid} not expecting a final" f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor" " result?\nresult() should only be called if subactor"
@ -196,17 +194,15 @@ class Portal:
return NoResult return NoResult
# expecting a "main" result # expecting a "main" result
assert self._expect_result assert self._expect_result_ctx
if self._result_msg is None: if self._final_result is None:
self._result_msg = await self._return_once( self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
self._expect_result ctx=self._expect_result_ctx,
expect_msg=Return,
) )
return _unwrap_msg( return self._final_result
self._result_msg,
self.channel,
)
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
@ -337,11 +333,9 @@ class Portal:
kwargs=kwargs, kwargs=kwargs,
portal=self, portal=self,
) )
ctx._portal: Portal = self return await ctx._pld_rx.recv_pld(
msg: Return = await self._return_once(ctx) ctx=ctx,
return _unwrap_msg( expect_msg=Return,
msg,
self.channel,
) )
async def run( async def run(
@ -391,10 +385,9 @@ class Portal:
kwargs=kwargs, kwargs=kwargs,
portal=self, portal=self,
) )
ctx._portal = self return await ctx._pld_rx.recv_pld(
return _unwrap_msg( ctx=ctx,
await self._return_once(ctx), expect_msg=Return,
self.channel,
) )
@acm @acm
@ -427,7 +420,6 @@ class Portal:
kwargs=kwargs, kwargs=kwargs,
portal=self, portal=self,
) )
ctx._portal = self
# ensure receive-only stream entrypoint # ensure receive-only stream entrypoint
assert ctx._remote_func_type == 'asyncgen' assert ctx._remote_func_type == 'asyncgen'
@ -436,10 +428,11 @@ class Portal:
# deliver receive only stream # deliver receive only stream
async with MsgStream( async with MsgStream(
ctx=ctx, ctx=ctx,
rx_chan=ctx._recv_chan, rx_chan=ctx._rx_chan,
) as rchan: ) as stream:
self._streams.add(rchan) self._streams.add(stream)
yield rchan ctx._stream = stream
yield stream
finally: finally:
@ -461,7 +454,7 @@ class Portal:
# XXX: should this always be done? # XXX: should this always be done?
# await recv_chan.aclose() # await recv_chan.aclose()
self._streams.remove(rchan) self._streams.remove(stream)
# NOTE: impl is found in `._context`` mod to make # NOTE: impl is found in `._context`` mod to make
# reading/groking the details simpler code-org-wise. This # reading/groking the details simpler code-org-wise. This

View File

@ -181,12 +181,11 @@ async def _invoke_non_context(
# way: using the linked IPC context machinery. # way: using the linked IPC context machinery.
failed_resp: bool = False failed_resp: bool = False
try: try:
await chan.send( ack = StartAck(
StartAck( cid=cid,
cid=cid, functype='asyncfunc',
functype='asyncfunc',
)
) )
await chan.send(ack)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
@ -194,12 +193,11 @@ async def _invoke_non_context(
) as ipc_err: ) as ipc_err:
failed_resp = True failed_resp = True
if is_rpc: if is_rpc:
raise raise ipc_err
else: else:
# TODO: should this be an `.exception()` call? log.exception(
log.warning( f'Failed to respond to runtime RPC request for\n\n'
f'Failed to respond to non-rpc request: {func}\n' f'{ack}\n'
f'{ipc_err}'
) )
with cancel_scope as cs: with cancel_scope as cs:
@ -220,20 +218,19 @@ async def _invoke_non_context(
and chan.connected() and chan.connected()
): ):
try: try:
await chan.send( ret_msg = return_msg(
return_msg( cid=cid,
cid=cid, pld=result,
pld=result,
)
) )
await chan.send(ret_msg)
except ( except (
BrokenPipeError, BrokenPipeError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.warning( log.warning(
'Failed to return result:\n' 'Failed to send RPC result?\n'
f'{func}@{actor.uid}\n' f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
f'remote chan: {chan.uid}' f'x=> peer: {chan.uid}\n'
) )
@acm @acm
@ -250,7 +247,7 @@ async def _errors_relayed_via_ipc(
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
__tracebackhide__: bool = hide_tb # TODO: use hide_tb here? __tracebackhide__: bool = hide_tb
try: try:
yield # run RPC invoke body yield # run RPC invoke body
@ -262,23 +259,19 @@ async def _errors_relayed_via_ipc(
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as err:
# always hide this frame from debug REPL if the crash # NOTE: always hide this frame from debug REPL call stack
# originated from an rpc task and we DID NOT fail due to # if the crash originated from an RPC task and we DID NOT
# an IPC transport error! # fail due to an IPC transport error!
if ( if (
is_rpc is_rpc
and chan.connected() and
chan.connected()
): ):
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
entered_debug: bool = False entered_debug: bool = False
if ( if (
( (
@ -310,19 +303,18 @@ async def _errors_relayed_via_ipc(
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err) entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug: if not entered_debug:
log.exception( log.exception(
'RPC task crashed\n' 'RPC task crashed\n'
f'|_{ctx}' f'|_{ctx}'
) )
# always (try to) ship RPC errors back to caller # ALWAYS try to ship RPC errors back to parent/caller task
if is_rpc: if is_rpc:
#
# TODO: tests for this scenario: # TODO: tests for this scenario:
# - RPC caller closes connection before getting a response # - RPC caller closes connection before getting a response
# should **not** crash this actor.. # should **not** crash this actor..
await try_ship_error_to_remote( await try_ship_error_to_remote(
chan, chan,
err, err,
@ -331,33 +323,41 @@ async def _errors_relayed_via_ipc(
hide_tb=hide_tb, hide_tb=hide_tb,
) )
# error is probably from above coro running code *not from # if the ctx cs is NOT allocated, the error is likely from
# the target rpc invocation since a scope was never # above `coro` invocation machinery NOT from inside the
# allocated around the coroutine await. # `coro` itself, i.e. err is NOT a user application error.
if ctx._scope is None: if ctx._scope is None:
# we don't ever raise directly here to allow the # we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this # msg-loop-scheduler to continue running for this
# channel. # channel.
task_status.started(err) task_status.started(err)
# always reraise KBIs so they propagate at the sys-process # always reraise KBIs so they propagate at the sys-process level.
# level.
if isinstance(err, KeyboardInterrupt): if isinstance(err, KeyboardInterrupt):
raise raise
# RPC task bookeeping.
# RPC task bookeeping # since RPC tasks are scheduled inside a flat
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally: finally:
try: try:
ctx, func, is_complete = actor._rpc_tasks.pop( ctx: Context
func: Callable
is_complete: trio.Event
(
ctx,
func,
is_complete,
) = actor._rpc_tasks.pop(
(chan, ctx.cid) (chan, ctx.cid)
) )
is_complete.set() is_complete.set()
except KeyError: except KeyError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning( log.warning(
'RPC task likely errored or cancelled before start?' 'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
@ -372,7 +372,7 @@ async def _errors_relayed_via_ipc(
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed") log.runtime('All RPC tasks have completed')
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
@ -414,19 +414,16 @@ async def _invoke(
# TODO: possibly a specially formatted traceback # TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)? # (not sure what typing is for this..)?
# tb = None # tb: TracebackType = None
cancel_scope = CancelScope() cancel_scope = CancelScope()
# activated cancel scope ref cs: CancelScope|None = None # ref when activated
cs: CancelScope|None = None
ctx = actor.get_context( ctx = actor.get_context(
chan=chan, chan=chan,
cid=cid, cid=cid,
nsf=NamespacePath.from_ref(func), nsf=NamespacePath.from_ref(func),
# TODO: if we wanted to get cray and support it? # NOTE: no portal passed bc this is the "child"-side
# side='callee',
# We shouldn't ever need to pass this through right? # We shouldn't ever need to pass this through right?
# it's up to the soon-to-be called rpc task to # it's up to the soon-to-be called rpc task to
@ -459,8 +456,8 @@ async def _invoke(
kwargs['stream'] = ctx kwargs['stream'] = ctx
# handle decorated ``@tractor.context`` async function
elif getattr(func, '_tractor_context_function', False): elif getattr(func, '_tractor_context_function', False):
# handle decorated ``@tractor.context`` async function
kwargs['ctx'] = ctx kwargs['ctx'] = ctx
context = True context = True
@ -474,7 +471,8 @@ async def _invoke(
task_status=task_status, task_status=task_status,
): ):
if not ( if not (
inspect.isasyncgenfunction(func) or inspect.isasyncgenfunction(func)
or
inspect.iscoroutinefunction(func) inspect.iscoroutinefunction(func)
): ):
raise TypeError(f'{func} must be an async function!') raise TypeError(f'{func} must be an async function!')
@ -486,8 +484,7 @@ async def _invoke(
except TypeError: except TypeError:
raise raise
# TODO: implement all these cases in terms of the # TODO: impl all these cases in terms of the `Context` one!
# `Context` one!
if not context: if not context:
await _invoke_non_context( await _invoke_non_context(
actor, actor,
@ -503,7 +500,7 @@ async def _invoke(
return_msg, return_msg,
task_status, task_status,
) )
# below is only for `@context` funcs # XXX below fallthrough is ONLY for `@context` eps
return return
# our most general case: a remote SC-transitive, # our most general case: a remote SC-transitive,
@ -580,9 +577,6 @@ async def _invoke(
# itself calls `ctx._maybe_cancel_and_set_remote_error()` # itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error # which cancels the scope presuming the input error
# is not a `.cancel_acked` pleaser. # is not a `.cancel_acked` pleaser.
# - currently a never-should-happen-fallthrough case
# inside ._context._drain_to_final_msg()`..
# # TODO: remove this ^ right?
if ctx._scope.cancelled_caught: if ctx._scope.cancelled_caught:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
@ -598,9 +592,7 @@ async def _invoke(
if cs.cancel_called: if cs.cancel_called:
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
msg: str = ( explain: str = f'{ctx.side!r}-side task was cancelled by '
'actor was cancelled by '
)
# NOTE / TODO: if we end up having # NOTE / TODO: if we end up having
# ``Actor._cancel_task()`` call # ``Actor._cancel_task()`` call
@ -610,22 +602,28 @@ async def _invoke(
if ctx._cancel_called: if ctx._cancel_called:
# TODO: test for this!!!!! # TODO: test for this!!!!!
canceller: tuple = our_uid canceller: tuple = our_uid
msg += 'itself ' explain += 'itself '
# if the channel which spawned the ctx is the # if the channel which spawned the ctx is the
# one that cancelled it then we report that, vs. # one that cancelled it then we report that, vs.
# it being some other random actor that for ex. # it being some other random actor that for ex.
# some actor who calls `Portal.cancel_actor()` # some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx. # and by side-effect cancels this ctx.
#
# TODO: determine if the ctx peer task was the
# exact task which cancelled, vs. some other
# task in the same actor.
elif canceller == ctx.chan.uid: elif canceller == ctx.chan.uid:
msg += 'its caller' explain += f'its {ctx.peer_side!r}-side peer'
else: else:
msg += 'a remote peer' explain += 'a remote peer'
# TODO: move this "div centering" into
# a helper for use elsewhere!
div_chars: str = '------ - ------' div_chars: str = '------ - ------'
div_offset: int = ( div_offset: int = (
round(len(msg)/2)+1 round(len(explain)/2)+1
+ +
round(len(div_chars)/2)+1 round(len(div_chars)/2)+1
) )
@ -636,11 +634,12 @@ async def _invoke(
+ +
f'{div_chars}\n' f'{div_chars}\n'
) )
msg += ( explain += (
div_str + div_str +
f'<= canceller: {canceller}\n' f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n' f'=> cancellee: {our_uid}\n'
f' |_{ctx._task}()' # TODO: better repr for ctx tasks..
f' |_{ctx.side!r} {ctx._task}'
# TODO: instead just show the # TODO: instead just show the
# ctx.__str__() here? # ctx.__str__() here?
@ -659,7 +658,7 @@ async def _invoke(
# task, so relay this cancel signal to the # task, so relay this cancel signal to the
# other side. # other side.
ctxc = ContextCancelled( ctxc = ContextCancelled(
message=msg, message=explain,
boxed_type=trio.Cancelled, boxed_type=trio.Cancelled,
canceller=canceller, canceller=canceller,
) )
@ -702,11 +701,9 @@ async def _invoke(
ctx: Context = actor._contexts.pop(( ctx: Context = actor._contexts.pop((
chan.uid, chan.uid,
cid, cid,
# ctx.side,
)) ))
merr: Exception|None = ctx.maybe_error merr: Exception|None = ctx.maybe_error
( (
res_type_str, res_type_str,
res_str, res_str,
@ -720,7 +717,7 @@ async def _invoke(
) )
log.runtime( log.runtime(
f'IPC context terminated with a final {res_type_str}\n\n' f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}\n' f'{ctx}'
) )
@ -806,13 +803,19 @@ async def process_messages(
and `Actor.cancel()` process-wide-runtime-shutdown requests and `Actor.cancel()` process-wide-runtime-shutdown requests
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
assert actor._service_n # state sanity assert actor._service_n # state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we
# should use it? # should use it?
# https://github.com/python-trio/trio/issues/467 # -[ ] existing GH https://github.com/python-trio/trio/issues/467
# -[ ] for other transports (like QUIC) we can possibly just
# entirely avoid the feeder mem-chans since each msg will be
# delivered with a ctx-id already?
#
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime( log.runtime(
'Entering RPC msg loop:\n' 'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
@ -850,7 +853,7 @@ async def process_messages(
| Return(cid=cid) | Return(cid=cid)
| CancelAck(cid=cid) | CancelAck(cid=cid)
# `.cid` means RPC-ctx-task specific # `.cid` indicates RPC-ctx-task scoped
| Error(cid=cid) | Error(cid=cid)
# recv-side `MsgType` decode violation # recv-side `MsgType` decode violation
@ -1046,16 +1049,16 @@ async def process_messages(
trio.Event(), trio.Event(),
) )
# runtime-scoped remote error (since no `.cid`) # runtime-scoped remote (internal) error
# (^- bc no `Error.cid` -^)
#
# NOTE: this is the non-rpc error case, that
# is, an error NOT raised inside a call to
# `_invoke()` (i.e. no cid was provided in the
# msg - see above). Raise error inline and
# mark the channel as "globally errored" for
# all downstream consuming primitives.
case Error(): case Error():
# NOTE: this is the non-rpc error case,
# that is, an error **not** raised inside
# a call to ``_invoke()`` (i.e. no cid was
# provided in the msg - see above). Push
# this error to all local channel
# consumers (normally portals) by marking
# the channel as errored
# assert chan.uid
chan._exc: Exception = unpack_error( chan._exc: Exception = unpack_error(
msg, msg,
chan=chan, chan=chan,
@ -1111,7 +1114,7 @@ async def process_messages(
f'|_{chan.raddr}\n' f'|_{chan.raddr}\n'
) )
# transport **was** disconnected # transport **WAS** disconnected
return True return True
except ( except (
@ -1150,12 +1153,11 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
'Exiting IPC msg loop with\n' 'Exiting IPC msg loop with final msg\n\n'
f'peer: {chan.uid}\n' f'<= peer: {chan.uid}\n'
f'|_{chan}\n\n' f'|_{chan}\n\n'
'final msg:\n' f'{pformat(msg)}\n\n'
f'{pformat(msg)}\n'
) )
# transport **was not** disconnected # transport **WAS NOT** disconnected
return False return False

View File

@ -817,8 +817,8 @@ class Actor:
state.max_buffer_size = msg_buffer_size state.max_buffer_size = msg_buffer_size
except KeyError: except KeyError:
log.runtime( log.debug(
f'Creating NEW IPC ctx for\n' f'Allocate new IPC ctx for\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'
f'cid: {cid}\n' f'cid: {cid}\n'
) )
@ -850,7 +850,7 @@ class Actor:
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
ack_timeout: float = 3, ack_timeout: float = float('inf'),
) -> Context: ) -> Context:
''' '''
@ -906,7 +906,7 @@ class Actor:
# this should be immediate and does not (yet) wait for the # this should be immediate and does not (yet) wait for the
# remote child task to sync via `Context.started()`. # remote child task to sync via `Context.started()`.
with trio.fail_after(ack_timeout): with trio.fail_after(ack_timeout):
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() first_msg: msgtypes.StartAck = await ctx._rx_chan.receive()
try: try:
functype: str = first_msg.functype functype: str = first_msg.functype
except AttributeError: except AttributeError:

View File

@ -35,7 +35,7 @@ import warnings
import trio import trio
from ._exceptions import ( from ._exceptions import (
_raise_from_no_key_in_msg, # _raise_from_no_key_in_msg,
ContextCancelled, ContextCancelled,
) )
from .log import get_logger from .log import get_logger
@ -44,8 +44,9 @@ from .trionics import (
BroadcastReceiver, BroadcastReceiver,
) )
from tractor.msg import ( from tractor.msg import (
Return, # Return,
Stop, # Stop,
MsgType,
Yield, Yield,
) )
@ -94,24 +95,23 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool|trio.EndOfChannel = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
# TODO: could we make this a direct method bind to `PldRx`?
# -> receive_nowait = PldRx.recv_pld
# |_ means latter would have to accept `MsgStream`-as-`self`?
# => should be fine as long as,
# -[ ] both define `._rx_chan`
# -[ ] .ctx is bound into `PldRx` using a `@cm`?
#
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait( def receive_nowait(
self, self,
allow_msgs: list[str] = Yield, expect_msg: MsgType = Yield,
): ):
msg: Yield|Stop = self._rx_chan.receive_nowait() ctx: Context = self._ctx
# TODO: replace msg equiv of this or does the `.pld` return ctx._pld_rx.recv_pld_nowait(
# interface read already satisfy it? I think so, yes? ctx=ctx,
try: expect_msg=expect_msg,
return msg.pld )
except AttributeError as attrerr:
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=attrerr,
log=log,
stream=self,
)
async def receive( async def receive(
self, self,
@ -146,24 +146,9 @@ class MsgStream(trio.abc.Channel):
src_err: Exception|None = None # orig tb src_err: Exception|None = None # orig tb
try: try:
try:
msg: Yield = await self._rx_chan.receive()
return msg.pld
# TODO: implement with match: instead? ctx: Context = self._ctx
except AttributeError as attrerr: return await ctx._pld_rx.recv_pld(ctx=ctx)
# src_err = kerr
src_err = attrerr
# NOTE: may raise any of the below error types
# includg EoC when a 'stop' msg is found.
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=attrerr,
log=log,
stream=self,
)
# XXX: the stream terminates on either of: # XXX: the stream terminates on either of:
# - via `self._rx_chan.receive()` raising after manual closure # - via `self._rx_chan.receive()` raising after manual closure
@ -228,7 +213,7 @@ class MsgStream(trio.abc.Channel):
# probably want to instead raise the remote error # probably want to instead raise the remote error
# over the end-of-stream connection error since likely # over the end-of-stream connection error since likely
# the remote error was the source cause? # the remote error was the source cause?
ctx: Context = self._ctx # ctx: Context = self._ctx
ctx.maybe_raise( ctx.maybe_raise(
raise_ctxc_from_self_call=True, raise_ctxc_from_self_call=True,
) )
@ -292,7 +277,8 @@ class MsgStream(trio.abc.Channel):
while not drained: while not drained:
try: try:
maybe_final_msg = self.receive_nowait( maybe_final_msg = self.receive_nowait(
allow_msgs=[Yield, Return], # allow_msgs=[Yield, Return],
expect_msg=Yield,
) )
if maybe_final_msg: if maybe_final_msg:
log.debug( log.debug(
@ -472,6 +458,9 @@ class MsgStream(trio.abc.Channel):
self, self,
# use memory channel size by default # use memory channel size by default
self._rx_chan._state.max_buffer_size, # type: ignore self._rx_chan._state.max_buffer_size, # type: ignore
# TODO: can remove this kwarg right since
# by default behaviour is to do this anyway?
receive_afunc=self.receive, receive_afunc=self.receive,
) )
@ -517,19 +506,11 @@ class MsgStream(trio.abc.Channel):
raise self._closed raise self._closed
try: try:
# await self._ctx.chan.send(
# payload={
# 'yield': data,
# 'cid': self._ctx.cid,
# },
# # hide_tb=hide_tb,
# )
await self._ctx.chan.send( await self._ctx.chan.send(
payload=Yield( payload=Yield(
cid=self._ctx.cid, cid=self._ctx.cid,
pld=data, pld=data,
), ),
# hide_tb=hide_tb,
) )
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
@ -562,7 +543,7 @@ def stream(func: Callable) -> Callable:
''' '''
# TODO: apply whatever solution ``mypy`` ends up picking for this: # TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912 # https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_stream_function = True # type: ignore func._tractor_stream_function: bool = True # type: ignore
sig = inspect.signature(func) sig = inspect.signature(func)
params = sig.parameters params = sig.parameters

View File

@ -37,6 +37,11 @@ from ._codec import (
MsgDec as MsgDec, MsgDec as MsgDec,
current_codec as current_codec, current_codec as current_codec,
) )
# currently can't bc circular with `._context`
# from ._ops import (
# PldRx as PldRx,
# _drain_to_final_msg as _drain_to_final_msg,
# )
from .types import ( from .types import (
Msg as Msg, Msg as Msg,

View File

@ -75,7 +75,7 @@ log = get_logger(__name__)
# TODO: unify with `MsgCodec` by making `._dec` part this? # TODO: unify with `MsgCodec` by making `._dec` part this?
class MsgDec(Struct): class MsgDec(Struct):
''' '''
An IPC msg decoder. An IPC msg (payload) decoder.
Normally used to decode only a payload: `MsgType.pld: Normally used to decode only a payload: `MsgType.pld:
PayloadT` field before delivery to IPC consumer code. PayloadT` field before delivery to IPC consumer code.
@ -87,6 +87,31 @@ class MsgDec(Struct):
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:
return self._dec return self._dec
def __repr__(self) -> str:
speclines: str = self.spec_str
# in multi-typed spec case we stick the list
# all on newlines after the |__pld_spec__:,
# OW it's prolly single type spec-value
# so just leave it on same line.
if '\n' in speclines:
speclines: str = '\n' + textwrap.indent(
speclines,
prefix=' '*3,
)
body: str = textwrap.indent(
f'|_dec_hook: {self.dec.dec_hook}\n'
f'|__pld_spec__: {speclines}\n',
prefix=' '*2,
)
return (
f'<{type(self).__name__}(\n'
f'{body}'
')>'
)
# struct type unions # struct type unions
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
# #
@ -137,17 +162,7 @@ class MsgDec(Struct):
# TODO: would get moved into `FieldSpec.__str__()` right? # TODO: would get moved into `FieldSpec.__str__()` right?
@property @property
def spec_str(self) -> str: def spec_str(self) -> str:
return pformat_msgspec(codec=self)
# TODO: could also use match: instead?
spec: Union[Type]|Type = self.spec
# `typing.Union` case
if getattr(spec, '__args__', False):
return str(spec)
# just a single type
else:
return spec.__name__
pld_spec_str = spec_str pld_spec_str = spec_str
@ -168,9 +183,57 @@ def mk_dec(
) -> MsgDec: ) -> MsgDec:
return msgpack.Decoder( return MsgDec(
type=spec, # like `Msg[Any]` _dec=msgpack.Decoder(
dec_hook=dec_hook, type=spec, # like `Msg[Any]`
dec_hook=dec_hook,
)
)
def mk_msgspec_table(
dec: msgpack.Decoder,
msg: MsgType|None = None,
) -> dict[str, MsgType]|str:
'''
Fill out a `dict` of `MsgType`s keyed by name
for a given input `msgspec.msgpack.Decoder`
as defined by its `.type: Union[Type]` setting.
If `msg` is provided, only deliver a `dict` with a single
entry for that type.
'''
msgspec: Union[Type]|Type = dec.type
if not (msgtypes := getattr(msgspec, '__args__', False)):
msgtypes = [msgspec]
msgt_table: dict[str, MsgType] = {
msgt: str(msgt)
for msgt in msgtypes
}
if msg:
msgt: MsgType = type(msg)
str_repr: str = msgt_table[msgt]
return {msgt: str_repr}
return msgt_table
def pformat_msgspec(
codec: MsgCodec|MsgDec,
msg: MsgType|None = None,
join_char: str = '\n',
) -> str:
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
return join_char.join(
mk_msgspec_table(
dec=dec,
msg=msg,
).values()
) )
# TODO: overall IPC msg-spec features (i.e. in this mod)! # TODO: overall IPC msg-spec features (i.e. in this mod)!
@ -200,7 +263,7 @@ class MsgCodec(Struct):
def __repr__(self) -> str: def __repr__(self) -> str:
speclines: str = textwrap.indent( speclines: str = textwrap.indent(
self.pformat_msg_spec(), pformat_msgspec(codec=self),
prefix=' '*3, prefix=' '*3,
) )
body: str = textwrap.indent( body: str = textwrap.indent(
@ -244,33 +307,11 @@ class MsgCodec(Struct):
# NOTE: defined and applied inside `mk_codec()` # NOTE: defined and applied inside `mk_codec()`
return self._dec.type return self._dec.type
def msg_spec_items(
self,
msg: MsgType|None = None,
) -> dict[str, MsgType]|str:
msgt_table: dict[str, MsgType] = {
msgt: str(msgt)
for msgt in self.msg_spec.__args__
}
if msg:
msgt: MsgType = type(msg)
str_repr: str = msgt_table[msgt]
return {msgt: str_repr}
return msgt_table
# TODO: some way to make `pretty_struct.Struct` use this # TODO: some way to make `pretty_struct.Struct` use this
# wrapped field over the `.msg_spec` one? # wrapped field over the `.msg_spec` one?
def pformat_msg_spec( @property
self, def msg_spec_str(self) -> str:
msg: MsgType|None = None, return pformat_msgspec(self.msg_spec)
join_char: str = '\n',
) -> str:
return join_char.join(
self.msg_spec_items(msg=msg).values()
)
lib: ModuleType = msgspec lib: ModuleType = msgspec
@ -280,17 +321,32 @@ class MsgCodec(Struct):
def enc(self) -> msgpack.Encoder: def enc(self) -> msgpack.Encoder:
return self._enc return self._enc
# TODO: reusing encode buffer for perf?
# https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
_buf: bytearray = bytearray()
def encode( def encode(
self, self,
py_obj: Any, py_obj: Any,
use_buf: bool = False,
# ^-XXX-^ uhh why am i getting this?
# |_BufferError: Existing exports of data: object cannot be re-sized
) -> bytes: ) -> bytes:
''' '''
Encode input python objects to `msgpack` bytes for Encode input python objects to `msgpack` bytes for
transfer on a tranport protocol connection. transfer on a tranport protocol connection.
When `use_buf == True` use the output buffer optimization:
https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
''' '''
return self._enc.encode(py_obj) if use_buf:
self._enc.encode_into(py_obj, self._buf)
return self._buf
else:
return self._enc.encode(py_obj)
@property @property
def dec(self) -> msgpack.Decoder: def dec(self) -> msgpack.Decoder:

563
tractor/msg/_ops.py 100644
View File

@ -0,0 +1,563 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Near-application abstractions for `MsgType.pld: PayloadT|Raw`
delivery, filtering and type checking as well as generic
operational helpers for processing transaction flows.
'''
from __future__ import annotations
from contextlib import (
# asynccontextmanager as acm,
contextmanager as cm,
)
from pprint import pformat
from typing import (
Any,
Type,
TYPE_CHECKING,
# Union,
)
# ------ - ------
from msgspec import (
msgpack,
Raw,
Struct,
ValidationError,
)
import trio
# ------ - ------
from tractor.log import get_logger
from tractor._exceptions import (
MessagingError,
InternalError,
_raise_from_unexpected_msg,
MsgTypeError,
_mk_msg_type_err,
pack_from_raise,
)
from ._codec import (
mk_dec,
MsgDec,
)
from .types import (
CancelAck,
Error,
MsgType,
PayloadT,
Return,
Started,
Stop,
Yield,
# pretty_struct,
)
if TYPE_CHECKING:
from tractor._context import Context
from tractor._streaming import MsgStream
log = get_logger(__name__)
class PldRx(Struct):
'''
A "msg payload receiver".
The pairing of a "feeder" `trio.abc.ReceiveChannel` and an
interchange-specific (eg. msgpack) payload field decoder. The
validation/type-filtering rules are runtime mutable and allow
type constraining the set of `MsgType.pld: Raw|PayloadT`
values at runtime, per IPC task-context.
This abstraction, being just below "user application code",
allows for the equivalent of our `MsgCodec` (used for
typer-filtering IPC dialog protocol msgs against a msg-spec)
but with granular control around payload delivery (i.e. the
data-values user code actually sees and uses (the blobs that
are "shuttled" by the wrapping dialog prot) such that invalid
`.pld: Raw` can be decoded and handled by IPC-primitive user
code (i.e. that operates on `Context` and `Msgstream` APIs)
without knowledge of the lower level `Channel`/`MsgTransport`
primitives nor the `MsgCodec` in use. Further, lazily decoding
payload blobs allows for topical (and maybe intentionally
"partial") encryption of msg field subsets.
'''
# TODO: better to bind it here?
# _rx_mc: trio.MemoryReceiveChannel
_msgdec: MsgDec = mk_dec(spec=Any)
_ipc: Context|MsgStream|None = None
@cm
def apply_to_ipc(
self,
ipc_prim: Context|MsgStream,
) -> PldRx:
'''
Apply this payload receiver to an IPC primitive type, one
of `Context` or `MsgStream`.
'''
self._ipc = ipc_prim
try:
yield self
finally:
self._ipc = None
@property
def dec(self) -> msgpack.Decoder:
return self._msgdec.dec
def recv_pld_nowait(
self,
# TODO: make this `MsgStream` compat as well, see above^
# ipc_prim: Context|MsgStream,
ctx: Context,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
**kwargs,
) -> Any|Raw:
msg: MsgType = (
ipc_msg
or
# sync-rx msg from underlying IPC feeder (mem-)chan
ctx._rx_chan.receive_nowait()
)
return self.dec_msg(
msg,
ctx=ctx,
expect_msg=expect_msg,
)
async def recv_pld(
self,
ctx: Context,
ipc_msg: MsgType|None = None,
expect_msg: Type[MsgType]|None = None,
**kwargs
) -> Any|Raw:
'''
Receive a `MsgType`, then decode and return its `.pld` field.
'''
msg: MsgType = (
ipc_msg
or
# async-rx msg from underlying IPC feeder (mem-)chan
await ctx._rx_chan.receive()
)
return self.dec_msg(
msg,
ctx=ctx,
expect_msg=expect_msg,
)
def dec_msg(
self,
msg: MsgType,
ctx: Context,
expect_msg: Type[MsgType]|None = None,
) -> PayloadT|Raw:
'''
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
return the value or raise an appropriate error.
'''
match msg:
# payload-data shuttle msg; deliver the `.pld` value
# directly to IPC (primitive) client-consumer code.
case (
Started(pld=pld) # sync phase
|Yield(pld=pld) # streaming phase
|Return(pld=pld) # termination phase
):
try:
pld: PayloadT = self._msgdec.decode(pld)
log.runtime(
'Decode msg payload\n\n'
f'{msg}\n\n'
f'{pld}\n'
)
return pld
# XXX pld-type failure
except ValidationError as src_err:
msgterr: MsgTypeError = _mk_msg_type_err(
msg=msg,
codec=self._dec,
src_validation_error=src_err,
)
msg: Error = pack_from_raise(
local_err=msgterr,
cid=msg.cid,
src_uid=ctx.chan.uid,
)
# XXX some other decoder specific failure?
# except TypeError as src_error:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise src_error
# a runtime-internal RPC endpoint response.
# always passthrough since (internal) runtime
# responses are generally never exposed to consumer
# code.
case CancelAck(
pld=bool(cancelled)
):
return cancelled
case Error():
src_err = MessagingError(
'IPC dialog termination by msg'
)
case _:
src_err = InternalError(
'Unknown IPC msg ??\n\n'
f'{msg}\n'
)
# fallthrough and raise from `src_err`
_raise_from_unexpected_msg(
ctx=ctx,
msg=msg,
src_err=src_err,
log=log,
expect_msg=expect_msg,
hide_tb=False,
)
async def recv_msg_w_pld(
self,
ipc: Context|MsgStream,
) -> tuple[MsgType, PayloadT]:
'''
Retrieve the next avail IPC msg, decode it's payload, and return
the pair of refs.
'''
msg: MsgType = await ipc._rx_chan.receive()
# TODO: is there some way we can inject the decoded
# payload into an existing output buffer for the original
# msg instance?
pld: PayloadT = self.dec_msg(
msg,
ctx=ipc,
)
return msg, pld
async def drain_to_final_msg(
ctx: Context,
hide_tb: bool = True,
msg_limit: int = 6,
) -> tuple[
Return|None,
list[MsgType]
]:
'''
Drain IPC msgs delivered to the underlying IPC primitive's
rx-mem-chan (eg. `Context._rx_chan`) from the runtime in
search for a final result or error.
The motivation here is to ideally capture errors during ctxc
conditions where a canc-request/or local error is sent but the
local task also excepts and enters the
`Portal.open_context().__aexit__()` block wherein we prefer to
capture and raise any remote error or ctxc-ack as part of the
`ctx.result()` cleanup and teardown sequence.
'''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[MsgType] = []
return_msg: Return|None = None
while not (
ctx.maybe_error
and not ctx._final_result_is_set()
):
try:
# TODO: can remove?
# await trio.lowlevel.checkpoint()
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# TODO: bad idea?
# -[ ] wrap final outcome channel wait in a scope so
# it can be cancelled out of band if needed?
#
# with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._rx_chan.receive()
# if res_cs.cancelled_caught:
# TODO: ensure there's no more hangs, debugging the
# runtime pretty preaase!
# from .devx._debug import pause
# await pause()
# TODO: can remove this finally?
# we have no more need for the sync draining right
# since we're can kinda guarantee the async
# `.receive()` below will never block yah?
#
# if (
# ctx._cancel_called and (
# ctx.cancel_acked
# # or ctx.chan._cancel_called
# )
# # or not ctx._final_result_is_set()
# # ctx.outcome is not
# # or ctx.chan._closed
# ):
# try:
# msg: dict = await ctx._rx_chan.receive_nowait()()
# except trio.WouldBlock:
# log.warning(
# 'When draining already `.cancel_called` ctx!\n'
# 'No final msg arrived..\n'
# )
# break
# else:
# msg: dict = await ctx._rx_chan.receive()
# TODO: don't need it right jefe?
# with trio.move_on_after(1) as cs:
# if cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
# pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._rx_chan.receive()
msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx)
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
ctx.maybe_raise()
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
match msg:
# final result arrived!
case Return(
# cid=cid,
# pld=res,
):
# ctx._result: Any = res
ctx._result: Any = pld
log.runtime(
'Context delivered final draining msg:\n'
f'{pformat(msg)}'
)
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._rx_chan:
# await ctx._rx_chan.aclose()
# TODO: ^ we don't need it right?
return_msg = msg
break
# far end task is still streaming to us so discard
# and report depending on local ctx state.
case Yield():
pre_result_drained.append(msg)
if (
(ctx._stream.closed
and (reason := 'stream was already closed')
)
or (ctx.cancel_acked
and (reason := 'ctx cancelled other side')
)
or (ctx._cancel_called
and (reason := 'ctx called `.cancel()`')
)
or (len(pre_result_drained) > msg_limit
and (reason := f'"yield" limit={msg_limit}')
)
):
log.cancel(
'Cancelling `MsgStream` drain since '
f'{reason}\n\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
)
return (
return_msg,
pre_result_drained,
)
# drain up to the `msg_limit` hoping to get
# a final result or error/ctxc.
else:
log.warning(
'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n'
f'=> {ctx._task}\n'
f' |_{ctx._stream}\n\n'
f'{pformat(msg)}\n'
)
continue
# stream terminated, but no result yet..
#
# TODO: work out edge cases here where
# a stream is open but the task also calls
# this?
# -[ ] should be a runtime error if a stream is open right?
# Stop()
case Stop():
pre_result_drained.append(msg)
log.cancel(
'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n'
)
continue
# remote error msg, likely already handled inside
# `Context._deliver_msg()`
case Error():
# TODO: can we replace this with `ctx.maybe_raise()`?
# -[ ] would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
#
re: Exception|None = ctx._remote_error
if re:
assert msg is ctx._cancel_msg
# NOTE: this solved a super duper edge case XD
# this was THE super duper edge case of:
# - local task opens a remote task,
# - requests remote cancellation of far end
# ctx/tasks,
# - needs to wait for the cancel ack msg
# (ctxc) or some result in the race case
# where the other side's task returns
# before the cancel request msg is ever
# rxed and processed,
# - here this surrounding drain loop (which
# iterates all ipc msgs until the ack or
# an early result arrives) was NOT exiting
# since we are the edge case: local task
# does not re-raise any ctxc it receives
# IFF **it** was the cancellation
# requester..
#
# XXX will raise if necessary but ow break
# from loop presuming any supressed error
# (ctxc) should terminate the context!
ctx._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=ctx._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is ctx._cancel_msg
# assert error.msgdata == ctx._remote_error.msgdata
# assert error.ipc_msg == ctx._remote_error.ipc_msg
# from .devx._debug import pause
# await pause()
# ctx._maybe_cancel_and_set_remote_error(error)
# ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise
# XXX should pretty much never get here unless someone
# overrides the default `MsgType` spec.
case _:
pre_result_drained.append(msg)
# It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here!
if not msg.cid:
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
raise RuntimeError('Unknown msg type: {msg}')
else:
log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n'
f'{ctx.outcome}\n'
)
return (
return_msg,
pre_result_drained,
)

View File

@ -102,6 +102,59 @@ def iter_fields(struct: Struct) -> Iterator[
) )
def pformat(
struct: Struct,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + struct.__class__.__qualname__
qtn: str = struct.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in iter_fields(struct):
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
class Struct( class Struct(
_Struct, _Struct,
@ -140,65 +193,12 @@ class Struct(
return sin_props return sin_props
# TODO: make thisi a mod-func! pformat = pformat
def pformat( # __str__ = __repr__ = pformat
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in iter_fields(self):
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering # TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty? # inside a known tty?
# def __repr__(self) -> str: # def __repr__(self) -> str:
# ... # ...
# __str__ = __repr__ = pformat
__repr__ = pformat __repr__ = pformat
def copy( def copy(