forked from goodboy/tractor
1
0
Fork 0

Start tidying up `._context`, use `pack_from_raise()`

Mostly removing commented (and replaced) code blocks lingering from the
ctxc semantics work and new typed-msg-spec `MsgType`s handling AND use
the new `._exceptions.pack_from_raise()` helper to construct
`StreamOverrun` msgs.

Deaterz:
- clean out the drain loop now that it's implemented to handle our
  struct msg types including the `dict`-msg bits left in as
  fallback-reminders, any notes/todos better summarized at the top of
  their blocks, remove any `_final_result_is_set()` related duplicate/legacy
  tidbits.
- use a `case Error()` block in drain loop with fallthrough to `_:`
  always resulting in an rte raise.
- move "XXX" notes into the doc-string for `._deliver_msg()` as
  a "rules" section.
- use `match:` syntax for logging the `result_or_err: MsgType` outcome
  from the final `.result()` call inside `open_context_from_portal()`.
- generally speaking use `MsgType` type annotations throughout!
runtime_to_msgspec
Tyler Goodlet 2024-04-09 13:46:34 -04:00
parent a35c1d40ab
commit 8839bb06a3
3 changed files with 95 additions and 136 deletions

View File

@ -49,20 +49,21 @@ from ._exceptions import (
InternalError, InternalError,
RemoteActorError, RemoteActorError,
StreamOverrun, StreamOverrun,
pack_error, pack_from_raise,
unpack_error, unpack_error,
_raise_from_no_key_in_msg, _raise_from_no_key_in_msg,
) )
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
Error,
MsgType,
MsgCodec,
NamespacePath, NamespacePath,
Msg,
Return, Return,
Started, Started,
Stop, Stop,
Yield, Yield,
current_codec, current_codec,
MsgCodec,
pretty_struct, pretty_struct,
) )
from ._ipc import Channel from ._ipc import Channel
@ -107,8 +108,7 @@ async def _drain_to_final_msg(
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
# from the far end. # from the far end.
# pre_result_drained: list[dict] = [] pre_result_drained: list[MsgType] = []
pre_result_drained: list[Msg] = []
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and not ctx._final_result_is_set()
@ -168,7 +168,7 @@ async def _drain_to_final_msg(
# pray to the `trio` gawds that we're corrent with this # pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._recv_chan.receive() # msg: dict = await ctx._recv_chan.receive()
msg: Msg = await ctx._recv_chan.receive() msg: MsgType = await ctx._recv_chan.receive()
# always capture unexpected/non-result msgs # always capture unexpected/non-result msgs
pre_result_drained.append(msg) pre_result_drained.append(msg)
@ -191,13 +191,12 @@ async def _drain_to_final_msg(
raise raise
match msg: match msg:
# final result arrived!
case Return( case Return(
cid=cid, # cid=cid,
pld=res, pld=res,
): ):
# try:
# ctx._result: Any = msg['return']
# ctx._result: Any = msg.pld
ctx._result: Any = res ctx._result: Any = res
log.runtime( log.runtime(
'Context delivered final draining msg:\n' 'Context delivered final draining msg:\n'
@ -210,13 +209,9 @@ async def _drain_to_final_msg(
# TODO: ^ we don't need it right? # TODO: ^ we don't need it right?
break break
# except KeyError: # far end task is still streaming to us so discard
# except AttributeError: # and report depending on local ctx state.
case Yield(): case Yield():
# if 'yield' in msg:
# far end task is still streaming to us so discard
# and report per local context state.
if ( if (
(ctx._stream.closed (ctx._stream.closed
and (reason := 'stream was already closed') and (reason := 'stream was already closed')
@ -257,45 +252,34 @@ async def _drain_to_final_msg(
) )
continue continue
# stream terminated, but no result yet..
#
# TODO: work out edge cases here where # TODO: work out edge cases here where
# a stream is open but the task also calls # a stream is open but the task also calls
# this? # this?
# -[ ] should be a runtime error if a stream is open right? # -[ ] should be a runtime error if a stream is open right?
# Stop() # Stop()
case Stop(): case Stop():
# elif 'stop' in msg:
log.cancel( log.cancel(
'Remote stream terminated due to "stop" msg:\n\n' 'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
continue continue
# It's an internal error if any other msg type without # remote error msg, likely already handled inside
# a`'cid'` field arrives here! # `Context._deliver_msg()`
case _: case Error():
# if not msg.get('cid'):
if not msg.cid:
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
# XXX fallthrough to handle expected error XXX # TODO: can we replace this with `ctx.maybe_raise()`?
# TODO: replace this with `ctx.maybe_raise()` # -[ ] would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
# #
# TODO: would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises:
# if raises:
# log.error('some msg about raising..')
re: Exception|None = ctx._remote_error re: Exception|None = ctx._remote_error
if re: if re:
log.critical(
'Remote ctx terminated due to "error" msg:\n'
f'{re}'
)
assert msg is ctx._cancel_msg assert msg is ctx._cancel_msg
# NOTE: this solved a super dupe edge case XD # NOTE: this solved a super duper edge case XD
# this was THE super duper edge case of: # this was THE super duper edge case of:
# - local task opens a remote task, # - local task opens a remote task,
# - requests remote cancellation of far end # - requests remote cancellation of far end
@ -312,9 +296,10 @@ async def _drain_to_final_msg(
# does not re-raise any ctxc it receives # does not re-raise any ctxc it receives
# IFF **it** was the cancellation # IFF **it** was the cancellation
# requester.. # requester..
# will raise if necessary, ow break from #
# loop presuming any error terminates the # XXX will raise if necessary but ow break
# context! # from loop presuming any supressed error
# (ctxc) should terminate the context!
ctx._maybe_raise_remote_err( ctx._maybe_raise_remote_err(
re, re,
# NOTE: obvi we don't care if we # NOTE: obvi we don't care if we
@ -338,6 +323,7 @@ async def _drain_to_final_msg(
log.critical('SHOULD NEVER GET HERE!?') log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata assert error.msgdata == ctx._remote_error.msgdata
assert error.ipc_msg == ctx._remote_error.ipc_msg
from .devx._debug import pause from .devx._debug import pause
await pause() await pause()
ctx._maybe_cancel_and_set_remote_error(error) ctx._maybe_cancel_and_set_remote_error(error)
@ -346,6 +332,20 @@ async def _drain_to_final_msg(
else: else:
# bubble the original src key error # bubble the original src key error
raise raise
# XXX should pretty much never get here unless someone
# overrides the default `MsgType` spec.
case _:
# It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here!
if not msg.cid:
raise InternalError(
'Unexpected cid-missing msg?\n\n'
f'{msg}\n'
)
raise RuntimeError('Unknown msg type: {msg}')
else: else:
log.cancel( log.cancel(
'Skipping `MsgStream` drain since final outcome is set\n\n' 'Skipping `MsgStream` drain since final outcome is set\n\n'
@ -1342,8 +1342,11 @@ class Context:
# `._cancel_called == True`. # `._cancel_called == True`.
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.msgdata['boxed_type_str'] == 'StreamOverrun'
and tuple(remote_error.msgdata['sender']) == our_uid and remote_error.boxed_type_str == 'StreamOverrun'
# and tuple(remote_error.msgdata['sender']) == our_uid
and tuple(remote_error.sender) == our_uid
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing" # cancellation" error-response thus "absorbing"
@ -1412,16 +1415,11 @@ class Context:
assert self._recv_chan assert self._recv_chan
raise_overrun: bool = not self._allow_overruns raise_overrun: bool = not self._allow_overruns
# res_placeholder: int = id(self)
if ( if (
# self._result == res_placeholder
# and not self._remote_error
self.maybe_error is None self.maybe_error is None
# not self._remote_error and
# and not self._local_error not self._recv_chan._closed # type: ignore
and not self._recv_chan._closed # type: ignore
): ):
# wait for a final context result/error by "draining" # wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield" # (by more or less ignoring) any bi-dir-stream "yield"
# msgs still in transit from the far end. # msgs still in transit from the far end.
@ -1432,7 +1430,6 @@ class Context:
for msg in drained_msgs: for msg in drained_msgs:
# TODO: mask this by default.. # TODO: mask this by default..
# if 'return' in msg:
if isinstance(msg, Return): if isinstance(msg, Return):
# from .devx import pause # from .devx import pause
# await pause() # await pause()
@ -1448,6 +1445,9 @@ class Context:
) )
self.maybe_raise( self.maybe_raise(
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
raise_overrun_from_self=( raise_overrun_from_self=(
raise_overrun raise_overrun
and and
@ -1458,34 +1458,12 @@ class Context:
(not self._cancel_called) (not self._cancel_called)
) )
) )
# if (
# (re := self._remote_error)
# # and self._result == res_placeholder
# ):
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=(
# raise_overrun
# and
# # only when we ARE NOT the canceller
# # should we raise overruns, bc ow we're
# # raising something we know might happen
# # during cancellation ;)
# (not self._cancel_called)
# ),
# )
# if maybe_err:
# self._result = maybe_err
return self.outcome return self.outcome
# TODO: switch this with above which should be named # TODO: switch this with above!
# `.wait_for_outcome()` and instead do # -[ ] should be named `.wait_for_outcome()` and instead do
# a `.outcome.Outcome.unwrap()` ? # a `.outcome.Outcome.unwrap()` ?
#
# @property # @property
# def result(self) -> Any|None: # def result(self) -> Any|None:
# if self._final_result_is_set(): # if self._final_result_is_set():
@ -1544,7 +1522,6 @@ class Context:
return None return None
def _final_result_is_set(self) -> bool: def _final_result_is_set(self) -> bool:
# return not (self._result == id(self))
return self._result is not Unresolved return self._result is not Unresolved
# def get_result_nowait(self) -> Any|None: # def get_result_nowait(self) -> Any|None:
@ -1761,8 +1738,7 @@ class Context:
async def _deliver_msg( async def _deliver_msg(
self, self,
# msg: dict, msg: MsgType,
msg: Msg,
) -> bool: ) -> bool:
''' '''
@ -1776,6 +1752,20 @@ class Context:
`._scope_nursery: trio.Nursery`) which ensures that such `._scope_nursery: trio.Nursery`) which ensures that such
messages are queued up and eventually sent if possible. messages are queued up and eventually sent if possible.
XXX RULES XXX
------ - ------
- NEVER raise remote errors from this method; a runtime task caller.
An error "delivered" to a ctx should always be raised by
the corresponding local task operating on the
`Portal`/`Context` APIs.
- NEVER `return` early before delivering the msg!
bc if the error is a ctxc and there is a task waiting on
`.result()` we need the msg to be
`send_chan.send_nowait()`-ed over the `._recv_chan` so
that the error is relayed to that waiter task and thus
raised in user code!
''' '''
cid: str = self.cid cid: str = self.cid
chan: Channel = self.chan chan: Channel = self.chan
@ -1806,28 +1796,14 @@ class Context:
) )
self._cancel_msg: dict = msg self._cancel_msg: dict = msg
# NOTE: this will not raise an error, merely set # XXX NOTE: this will not raise an error, merely set
# `._remote_error` and maybe cancel any task currently # `._remote_error` and maybe cancel any task currently
# entered in `Portal.open_context()` presuming the # entered in `Portal.open_context()` presuming the
# error is "cancel causing" (i.e. a `ContextCancelled` # error is "cancel causing" (i.e. a `ContextCancelled`
# or `RemoteActorError`). # or `RemoteActorError`).
self._maybe_cancel_and_set_remote_error(re) self._maybe_cancel_and_set_remote_error(re)
# XXX NEVER do this XXX..!! # XXX only case where returning early is fine!
# bc if the error is a ctxc and there is a task
# waiting on `.result()` we need the msg to be sent
# over the `send_chan`/`._recv_chan` so that the error
# is relayed to that waiter task..
# return True
#
# XXX ALSO NO!! XXX
# => NEVER raise remote errors from the calling
# runtime task, they should always be raised by
# consumer side tasks operating on the
# `Portal`/`Context` APIs.
# if self._remote_error:
# self._maybe_raise_remote_err(error)
if self._in_overrun: if self._in_overrun:
log.warning( log.warning(
f'Queueing OVERRUN msg on caller task:\n' f'Queueing OVERRUN msg on caller task:\n'
@ -1946,31 +1922,27 @@ class Context:
# anything different. # anything different.
return False return False
else: else:
# txt += f'\n{msg}\n'
# raise local overrun and immediately pack as IPC # raise local overrun and immediately pack as IPC
# msg for far end. # msg for far end.
try: err_msg: Error = pack_from_raise(
raise StreamOverrun( local_err=StreamOverrun(
txt, txt,
sender=from_uid, sender=from_uid,
) ),
except StreamOverrun as err: cid=cid,
err_msg: dict[str, dict] = pack_error( )
err, try:
cid=cid, # relay condition to sender side remote task
) await chan.send(err_msg)
try: return True
# relay condition to sender side remote task
await chan.send(err_msg)
return True
except trio.BrokenResourceError: # XXX: local consumer has closed their side of
# XXX: local consumer has closed their side # the IPC so cancel the far end streaming task
# so cancel the far end streaming task except trio.BrokenResourceError:
log.warning( log.warning(
'Channel for ctx is already closed?\n' 'Channel for ctx is already closed?\n'
f'|_{chan}\n' f'|_{chan}\n'
) )
# ow, indicate unable to deliver by default # ow, indicate unable to deliver by default
return False return False
@ -2379,28 +2351,17 @@ async def open_context_from_portal(
# an exception type boxed in a `RemoteActorError` # an exception type boxed in a `RemoteActorError`
# is returned (meaning it was obvi not raised) # is returned (meaning it was obvi not raised)
# that we want to log-report on. # that we want to log-report on.
msgdata: str|None = getattr( match result_or_err:
result_or_err, case ContextCancelled() as ctxc:
'msgdata', log.cancel(ctxc.tb_str)
None
)
match (msgdata, result_or_err):
case (
{'tb_str': tbstr},
ContextCancelled(),
):
log.cancel(tbstr)
case ( case RemoteActorError() as rae:
{'tb_str': tbstr},
RemoteActorError(),
):
log.exception( log.exception(
'Context remotely errored!\n' 'Context remotely errored!\n'
f'<= peer: {uid}\n' f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n' f' |_ {nsf}()\n\n'
f'{tbstr}' f'{rae.tb_str}'
) )
case (None, _): case (None, _):
log.runtime( log.runtime(
@ -2410,7 +2371,6 @@ async def open_context_from_portal(
f'`{result_or_err}`\n' f'`{result_or_err}`\n'
) )
finally: finally:
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the

View File

@ -46,6 +46,7 @@ from ._state import (
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
Error,
NamespacePath, NamespacePath,
Return, Return,
) )
@ -69,8 +70,7 @@ log = get_logger(__name__)
# `._raise_from_no_key_in_msg()` (after tweak to # `._raise_from_no_key_in_msg()` (after tweak to
# accept a `chan: Channel` arg) in key block! # accept a `chan: Channel` arg) in key block!
def _unwrap_msg( def _unwrap_msg(
# msg: dict[str, Any], msg: Return|Error,
msg: Return,
channel: Channel, channel: Channel,
hide_tb: bool = True, hide_tb: bool = True,

View File

@ -49,7 +49,6 @@ from pprint import pformat
import signal import signal
import sys import sys
from typing import ( from typing import (
Any,
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )