Absorb EoCs via `Context.open_stream()` silently
I swear long ago it used to operate this way but, I guess this finalizes the design decision. It makes a lot more sense to *not* propagate any `trio.EndOfChannel` raised from a `Context.open_stream() as stream:` block when that EoC is due to graceful-explicit stream termination. We use the EoC much like a `StopAsyncIteration` where the error indicates termination of the stream due to either: - reception of a stop IPC msg indicating the far end ended the stream (gracecfully), - closure of the underlying `Context._recv_chan` either by the runtime or due to user code having called `MsgStream.aclose()`. User code shouldn't expect to handle EoC outside the block since the `@acm` having closed should indicate the exactly same lifetime state (of said stream) ;) Deats: - add special EoC handler in `.open_stream()` which silently "absorbs" the error only when the stream is already marked as closed (meaning the EoC indeed corresponds to IPC closure) with an assert for now ensuring the error is the same as set to `MsgStream._eoc`. - in `MsgStream.receive()` break up the handlers for EoC and `trio.ClosedResourceError` since the error instances are saved to different variables and we **don't** want to rewrite the exception in the eoc case (normally to mask `trio` internals in tbs) bc we need the instance to be the exact one for doing checks inside `.open_stream().__aexit__()` to absorb it. Other surrounding "improvements": - start using the new `Context.maybe_raise()` helper where it can easily replace existing equivalent block-sections. - use new `RemoteActorError.src_uid` as required.mv_to_new_trio_py3.11
parent
9221c57234
commit
668016d37b
|
@ -169,8 +169,7 @@ async def _drain_to_final_msg(
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
# cancellation.
|
# cancellation.
|
||||||
if re := ctx._remote_error:
|
ctx.maybe_raise()
|
||||||
ctx._maybe_raise_remote_err(re)
|
|
||||||
|
|
||||||
# CASE 1: we DID request the cancel we simply
|
# CASE 1: we DID request the cancel we simply
|
||||||
# continue to bubble up as normal.
|
# continue to bubble up as normal.
|
||||||
|
@ -257,6 +256,13 @@ async def _drain_to_final_msg(
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX fallthrough to handle expected error XXX
|
# XXX fallthrough to handle expected error XXX
|
||||||
|
# TODO: replace this with `ctx.maybe_raise()`
|
||||||
|
#
|
||||||
|
# TODO: would this be handier for this case maybe?
|
||||||
|
# async with maybe_raise_on_exit() as raises:
|
||||||
|
# if raises:
|
||||||
|
# log.error('some msg about raising..')
|
||||||
|
|
||||||
re: Exception|None = ctx._remote_error
|
re: Exception|None = ctx._remote_error
|
||||||
if re:
|
if re:
|
||||||
log.critical(
|
log.critical(
|
||||||
|
@ -595,7 +601,7 @@ class Context:
|
||||||
if not re:
|
if not re:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if from_uid := re.src_actor_uid:
|
if from_uid := re.src_uid:
|
||||||
from_uid: tuple = tuple(from_uid)
|
from_uid: tuple = tuple(from_uid)
|
||||||
|
|
||||||
our_uid: tuple = self._actor.uid
|
our_uid: tuple = self._actor.uid
|
||||||
|
@ -825,7 +831,7 @@ class Context:
|
||||||
# cancellation.
|
# cancellation.
|
||||||
maybe_error_src: tuple = getattr(
|
maybe_error_src: tuple = getattr(
|
||||||
error,
|
error,
|
||||||
'src_actor_uid',
|
'src_uid',
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
self._canceller = (
|
self._canceller = (
|
||||||
|
@ -1030,8 +1036,8 @@ class Context:
|
||||||
@acm
|
@acm
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
self,
|
self,
|
||||||
allow_overruns: bool | None = False,
|
allow_overruns: bool|None = False,
|
||||||
msg_buffer_size: int | None = None,
|
msg_buffer_size: int|None = None,
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
'''
|
'''
|
||||||
|
@ -1071,13 +1077,16 @@ class Context:
|
||||||
# absorbed there (silently) and we DO NOT want to
|
# absorbed there (silently) and we DO NOT want to
|
||||||
# actually try to stream - a cancel msg was already
|
# actually try to stream - a cancel msg was already
|
||||||
# sent to the other side!
|
# sent to the other side!
|
||||||
if self._remote_error:
|
self.maybe_raise(
|
||||||
# NOTE: this is diff then calling
|
raise_ctxc_from_self_call=True,
|
||||||
# `._maybe_raise_remote_err()` specifically
|
)
|
||||||
# because any task entering this `.open_stream()`
|
# NOTE: this is diff then calling
|
||||||
# AFTER cancellation has already been requested,
|
# `._maybe_raise_remote_err()` specifically
|
||||||
# we DO NOT want to absorb any ctxc ACK silently!
|
# because we want to raise a ctxc on any task entering this `.open_stream()`
|
||||||
raise self._remote_error
|
# AFTER cancellation was already been requested,
|
||||||
|
# we DO NOT want to absorb any ctxc ACK silently!
|
||||||
|
# if self._remote_error:
|
||||||
|
# raise self._remote_error
|
||||||
|
|
||||||
# XXX NOTE: if no `ContextCancelled` has been responded
|
# XXX NOTE: if no `ContextCancelled` has been responded
|
||||||
# back from the other side (yet), we raise a different
|
# back from the other side (yet), we raise a different
|
||||||
|
@ -1158,7 +1167,6 @@ class Context:
|
||||||
# await trio.lowlevel.checkpoint()
|
# await trio.lowlevel.checkpoint()
|
||||||
yield stream
|
yield stream
|
||||||
|
|
||||||
|
|
||||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||||
# wait for any immediate child in debug before popping the
|
# wait for any immediate child in debug before popping the
|
||||||
# context from the runtime msg loop otherwise inside
|
# context from the runtime msg loop otherwise inside
|
||||||
|
@ -1183,12 +1191,23 @@ class Context:
|
||||||
#
|
#
|
||||||
# await stream.aclose()
|
# await stream.aclose()
|
||||||
|
|
||||||
# if re := ctx._remote_error:
|
# NOTE: absorb and do not raise any
|
||||||
# ctx._maybe_raise_remote_err(
|
# EoC received from the other side such that
|
||||||
# re,
|
# it is not raised inside the surrounding
|
||||||
# raise_ctxc_from_self_call=True,
|
# context block's scope!
|
||||||
# )
|
except trio.EndOfChannel as eoc:
|
||||||
# await trio.lowlevel.checkpoint()
|
if (
|
||||||
|
eoc
|
||||||
|
and stream.closed
|
||||||
|
):
|
||||||
|
# sanity, can remove?
|
||||||
|
assert eoc is stream._eoc
|
||||||
|
# from .devx import pause
|
||||||
|
# await pause()
|
||||||
|
log.warning(
|
||||||
|
'Stream was terminated by EoC\n\n'
|
||||||
|
f'{repr(eoc)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if self._portal:
|
if self._portal:
|
||||||
|
@ -1204,7 +1223,6 @@ class Context:
|
||||||
# TODO: replace all the instances of this!! XD
|
# TODO: replace all the instances of this!! XD
|
||||||
def maybe_raise(
|
def maybe_raise(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -1388,33 +1406,41 @@ class Context:
|
||||||
f'{drained_msgs}'
|
f'{drained_msgs}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if (
|
self.maybe_raise(
|
||||||
(re := self._remote_error)
|
raise_overrun_from_self=(
|
||||||
# and self._result == res_placeholder
|
raise_overrun
|
||||||
):
|
and
|
||||||
self._maybe_raise_remote_err(
|
# only when we ARE NOT the canceller
|
||||||
re,
|
# should we raise overruns, bc ow we're
|
||||||
# NOTE: obvi we don't care if we
|
# raising something we know might happen
|
||||||
# overran the far end if we're already
|
# during cancellation ;)
|
||||||
# waiting on a final result (msg).
|
(not self._cancel_called)
|
||||||
# raise_overrun_from_self=False,
|
|
||||||
raise_overrun_from_self=(
|
|
||||||
raise_overrun
|
|
||||||
and
|
|
||||||
# only when we ARE NOT the canceller
|
|
||||||
# should we raise overruns, bc ow we're
|
|
||||||
# raising something we know might happen
|
|
||||||
# during cancellation ;)
|
|
||||||
(not self._cancel_called)
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
# if (
|
||||||
|
# (re := self._remote_error)
|
||||||
|
# # and self._result == res_placeholder
|
||||||
|
# ):
|
||||||
|
# self._maybe_raise_remote_err(
|
||||||
|
# re,
|
||||||
|
# # NOTE: obvi we don't care if we
|
||||||
|
# # overran the far end if we're already
|
||||||
|
# # waiting on a final result (msg).
|
||||||
|
# # raise_overrun_from_self=False,
|
||||||
|
# raise_overrun_from_self=(
|
||||||
|
# raise_overrun
|
||||||
|
# and
|
||||||
|
# # only when we ARE NOT the canceller
|
||||||
|
# # should we raise overruns, bc ow we're
|
||||||
|
# # raising something we know might happen
|
||||||
|
# # during cancellation ;)
|
||||||
|
# (not self._cancel_called)
|
||||||
|
# ),
|
||||||
|
# )
|
||||||
# if maybe_err:
|
# if maybe_err:
|
||||||
# self._result = maybe_err
|
# self._result = maybe_err
|
||||||
|
|
||||||
return self.outcome
|
return self.outcome
|
||||||
# None if self._result == res_placeholder
|
|
||||||
# else self._result
|
|
||||||
# )
|
|
||||||
|
|
||||||
# TODO: switch this with above which should be named
|
# TODO: switch this with above which should be named
|
||||||
# `.wait_for_outcome()` and instead do
|
# `.wait_for_outcome()` and instead do
|
||||||
|
@ -1863,8 +1889,9 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
# TODO: if we set this the wrapping `@acm` body will
|
# TODO: if we set this the wrapping `@acm` body will
|
||||||
# still be shown (awkwardly) on pdb REPL entry. Ideally
|
# still be shown (awkwardly) on pdb REPL entry. Ideally
|
||||||
# we can similarly annotate that frame to NOT show?
|
# we can similarly annotate that frame to NOT show? for now
|
||||||
hide_tb: bool = True,
|
# we DO SHOW this frame since it's awkward ow..
|
||||||
|
hide_tb: bool = False,
|
||||||
|
|
||||||
# proxied to RPC
|
# proxied to RPC
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
|
@ -136,7 +136,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
# return await self.receive()
|
# return await self.receive()
|
||||||
# except trio.EndOfChannel:
|
# except trio.EndOfChannel:
|
||||||
# raise StopAsyncIteration
|
# raise StopAsyncIteration
|
||||||
|
#
|
||||||
# see ``.aclose()`` for notes on the old behaviour prior to
|
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||||
# introducing this
|
# introducing this
|
||||||
if self._eoc:
|
if self._eoc:
|
||||||
|
@ -152,7 +152,6 @@ class MsgStream(trio.abc.Channel):
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
except KeyError as kerr:
|
except KeyError as kerr:
|
||||||
# log.exception('GOT KEYERROR')
|
|
||||||
src_err = kerr
|
src_err = kerr
|
||||||
|
|
||||||
# NOTE: may raise any of the below error types
|
# NOTE: may raise any of the below error types
|
||||||
|
@ -166,30 +165,20 @@ class MsgStream(trio.abc.Channel):
|
||||||
stream=self,
|
stream=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: we close the stream on any of these error conditions:
|
# XXX: the stream terminates on either of:
|
||||||
|
# - via `self._rx_chan.receive()` raising after manual closure
|
||||||
|
# by the rpc-runtime OR,
|
||||||
|
# - via a received `{'stop': ...}` msg from remote side.
|
||||||
|
# |_ NOTE: previously this was triggered by calling
|
||||||
|
# ``._rx_chan.aclose()`` on the send side of the channel inside
|
||||||
|
# `Actor._push_result()`, but now the 'stop' message handling
|
||||||
|
# has been put just above inside `_raise_from_no_key_in_msg()`.
|
||||||
except (
|
except (
|
||||||
# trio.ClosedResourceError, # by self._rx_chan
|
trio.EndOfChannel,
|
||||||
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
|
||||||
) as eoc:
|
) as eoc:
|
||||||
# log.exception('GOT EOC')
|
|
||||||
src_err = eoc
|
src_err = eoc
|
||||||
self._eoc = eoc
|
self._eoc = eoc
|
||||||
|
|
||||||
# a ``ClosedResourceError`` indicates that the internal
|
|
||||||
# feeder memory receive channel was closed likely by the
|
|
||||||
# runtime after the associated transport-channel
|
|
||||||
# disconnected or broke.
|
|
||||||
|
|
||||||
# an ``EndOfChannel`` indicates either the internal recv
|
|
||||||
# memchan exhausted **or** we raisesd it just above after
|
|
||||||
# receiving a `stop` message from the far end of the stream.
|
|
||||||
|
|
||||||
# Previously this was triggered by calling ``.aclose()`` on
|
|
||||||
# the send side of the channel inside
|
|
||||||
# ``Actor._push_result()`` (should still be commented code
|
|
||||||
# there - which should eventually get removed), but now the
|
|
||||||
# 'stop' message handling has been put just above.
|
|
||||||
|
|
||||||
# TODO: Locally, we want to close this stream gracefully, by
|
# TODO: Locally, we want to close this stream gracefully, by
|
||||||
# terminating any local consumers tasks deterministically.
|
# terminating any local consumers tasks deterministically.
|
||||||
# Once we have broadcast support, we **don't** want to be
|
# Once we have broadcast support, we **don't** want to be
|
||||||
|
@ -210,8 +199,11 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
# raise eoc
|
# raise eoc
|
||||||
|
|
||||||
except trio.ClosedResourceError as cre: # by self._rx_chan
|
# a ``ClosedResourceError`` indicates that the internal
|
||||||
# log.exception('GOT CRE')
|
# feeder memory receive channel was closed likely by the
|
||||||
|
# runtime after the associated transport-channel
|
||||||
|
# disconnected or broke.
|
||||||
|
except trio.ClosedResourceError as cre: # by self._rx_chan.receive()
|
||||||
src_err = cre
|
src_err = cre
|
||||||
log.warning(
|
log.warning(
|
||||||
'`Context._rx_chan` was already closed?'
|
'`Context._rx_chan` was already closed?'
|
||||||
|
@ -237,15 +229,30 @@ class MsgStream(trio.abc.Channel):
|
||||||
# over the end-of-stream connection error since likely
|
# over the end-of-stream connection error since likely
|
||||||
# the remote error was the source cause?
|
# the remote error was the source cause?
|
||||||
ctx: Context = self._ctx
|
ctx: Context = self._ctx
|
||||||
if re := ctx._remote_error:
|
ctx.maybe_raise(
|
||||||
ctx._maybe_raise_remote_err(
|
raise_ctxc_from_self_call=True,
|
||||||
re,
|
)
|
||||||
raise_ctxc_from_self_call=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# propagate any error but hide low-level frames from
|
# propagate any error but hide low-level frame details
|
||||||
# caller by default.
|
# from the caller by default for debug noise reduction.
|
||||||
if hide_tb:
|
if (
|
||||||
|
hide_tb
|
||||||
|
|
||||||
|
# XXX NOTE XXX don't reraise on certain
|
||||||
|
# stream-specific internal error types like,
|
||||||
|
#
|
||||||
|
# - `trio.EoC` since we want to use the exact instance
|
||||||
|
# to ensure that it is the error that bubbles upward
|
||||||
|
# for silent absorption by `Context.open_stream()`.
|
||||||
|
and not self._eoc
|
||||||
|
|
||||||
|
# - `RemoteActorError` (or `ContextCancelled`) if it gets
|
||||||
|
# raised from `_raise_from_no_key_in_msg()` since we
|
||||||
|
# want the same (as the above bullet) for any
|
||||||
|
# `.open_context()` block bubbled error raised by
|
||||||
|
# any nearby ctx API remote-failures.
|
||||||
|
# and not isinstance(src_err, RemoteActorError)
|
||||||
|
):
|
||||||
raise type(src_err)(*src_err.args) from src_err
|
raise type(src_err)(*src_err.args) from src_err
|
||||||
else:
|
else:
|
||||||
raise src_err
|
raise src_err
|
||||||
|
@ -370,6 +377,10 @@ class MsgStream(trio.abc.Channel):
|
||||||
# await rx_chan.aclose()
|
# await rx_chan.aclose()
|
||||||
|
|
||||||
if not self._eoc:
|
if not self._eoc:
|
||||||
|
log.cancel(
|
||||||
|
'Stream closed before it received an EoC?\n'
|
||||||
|
'Setting eoc manually..\n..'
|
||||||
|
)
|
||||||
self._eoc: bool = trio.EndOfChannel(
|
self._eoc: bool = trio.EndOfChannel(
|
||||||
f'Context stream closed by {self._ctx.side}\n'
|
f'Context stream closed by {self._ctx.side}\n'
|
||||||
f'|_{self}\n'
|
f'|_{self}\n'
|
||||||
|
@ -414,13 +425,11 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def closed(self) -> bool:
|
def closed(self) -> bool:
|
||||||
if (
|
|
||||||
(rxc := self._rx_chan._closed)
|
rxc: bool = self._rx_chan._closed
|
||||||
or
|
_closed: bool|Exception = self._closed
|
||||||
(_closed := self._closed)
|
_eoc: bool|trio.EndOfChannel = self._eoc
|
||||||
or
|
if rxc or _closed or _eoc:
|
||||||
(_eoc := self._eoc)
|
|
||||||
):
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'`MsgStream` is already closed\n'
|
f'`MsgStream` is already closed\n'
|
||||||
f'{self}\n'
|
f'{self}\n'
|
||||||
|
@ -496,7 +505,11 @@ class MsgStream(trio.abc.Channel):
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
|
# raise any alreay known error immediately
|
||||||
self._ctx.maybe_raise()
|
self._ctx.maybe_raise()
|
||||||
|
if self._eoc:
|
||||||
|
raise self._eoc
|
||||||
|
|
||||||
if self._closed:
|
if self._closed:
|
||||||
raise self._closed
|
raise self._closed
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue