Attempt at better internal traceback hiding

Previously i was trying to approach this using lots of
`__tracebackhide__`'s in various internal funcs but since it's not
exactly straight forward to do this inside core deps like `trio` and the
stdlib, it makes a bit more sense to optionally catch and re-raise
certain classes of errors from their originals using `raise from` syntax
as per:
https://docs.python.org/3/library/exceptions.html#exception-context

Deats:
- litter `._context` methods with `__tracebackhide__`/`hide_tb` which
  were previously being shown but that don't need to be to application
  code now that cancel semantics testing is finished up.
- i originally did the same but later commented it all out in `._ipc`
  since error catch and re-raise instead in higher level layers
  (above the transport) seems to be a much saner approach.
- add catch-n-reraise-from in `MsgStream.send()`/.`receive()` to avoid
  seeing the depths of `trio` and/or our `._ipc` layers on comms errors.

Further this patch adds some refactoring to use the
same remote-error shipper routine from both the actor-core in the RPC
invoker:
- rename it as `try_ship_error_to_remote()` and call it from
  `._invoke()` as well as it's prior usage.
- make it optionally accept `cid: str` a `remote_descr: str` and of
  course a `hide_tb: bool`.

Other misc tweaks:
- add some todo notes around `Actor.load_modules()` debug hooking.
- tweak the zombie reaper log msg and timeout value ;)
ctx_cancel_semantics_and_overruns
Tyler Goodlet 2024-03-13 09:55:47 -04:00
parent 389b305d3b
commit 544cb40533
6 changed files with 150 additions and 471 deletions

View File

@ -1198,8 +1198,12 @@ class Context:
# TODO: replace all the instances of this!! XD # TODO: replace all the instances of this!! XD
def maybe_raise( def maybe_raise(
self, self,
hide_tb: bool = True,
**kwargs, **kwargs,
) -> Exception|None: ) -> Exception|None:
__tracebackhide__: bool = hide_tb
if re := self._remote_error: if re := self._remote_error:
return self._maybe_raise_remote_err( return self._maybe_raise_remote_err(
re, re,
@ -1209,8 +1213,10 @@ class Context:
def _maybe_raise_remote_err( def _maybe_raise_remote_err(
self, self,
remote_error: Exception, remote_error: Exception,
raise_ctxc_from_self_call: bool = False, raise_ctxc_from_self_call: bool = False,
raise_overrun_from_self: bool = True, raise_overrun_from_self: bool = True,
hide_tb: bool = True,
) -> ( ) -> (
ContextCancelled # `.cancel()` request to far side ContextCancelled # `.cancel()` request to far side
@ -1222,6 +1228,7 @@ class Context:
a cancellation (if any). a cancellation (if any).
''' '''
__tracebackhide__: bool = hide_tb
our_uid: tuple = self.chan.uid our_uid: tuple = self.chan.uid
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
@ -1305,7 +1312,7 @@ class Context:
# TODO: change to `.wait_for_result()`? # TODO: change to `.wait_for_result()`?
async def result( async def result(
self, self,
hide_tb: bool = False, hide_tb: bool = True,
) -> Any|Exception: ) -> Any|Exception:
''' '''

View File

@ -19,13 +19,14 @@ Inter-process comms abstractions
""" """
from __future__ import annotations from __future__ import annotations
import struct
import platform
from pprint import pformat
from collections.abc import ( from collections.abc import (
AsyncGenerator, AsyncGenerator,
AsyncIterator, AsyncIterator,
) )
from contextlib import asynccontextmanager as acm
import platform
from pprint import pformat
import struct
import typing import typing
from typing import ( from typing import (
Any, Any,
@ -35,18 +36,16 @@ from typing import (
TypeVar, TypeVar,
) )
from tricycle import BufferedReceiveStream
import msgspec import msgspec
from tricycle import BufferedReceiveStream
import trio import trio
from async_generator import asynccontextmanager
from .log import get_logger from tractor.log import get_logger
from ._exceptions import TransportClosed from tractor._exceptions import TransportClosed
log = get_logger(__name__) log = get_logger(__name__)
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
log = get_logger(__name__)
def get_stream_addrs(stream: trio.SocketStream) -> tuple: def get_stream_addrs(stream: trio.SocketStream) -> tuple:
@ -206,7 +205,17 @@ class MsgpackTCPStream(MsgTransport):
else: else:
raise raise
async def send(self, msg: Any) -> None: async def send(
self,
msg: Any,
# hide_tb: bool = False,
) -> None:
'''
Send a msgpack coded blob-as-msg over TCP.
'''
# __tracebackhide__: bool = hide_tb
async with self._send_lock: async with self._send_lock:
bytes_data: bytes = self.encode(msg) bytes_data: bytes = self.encode(msg)
@ -388,15 +397,28 @@ class Channel:
) )
return transport return transport
async def send(self, item: Any) -> None: async def send(
self,
payload: Any,
# hide_tb: bool = False,
) -> None:
'''
Send a coded msg-blob over the transport.
'''
# __tracebackhide__: bool = hide_tb
log.transport( log.transport(
'=> send IPC msg:\n\n' '=> send IPC msg:\n\n'
f'{pformat(item)}\n' f'{pformat(payload)}\n'
) # type: ignore ) # type: ignore
assert self._transport assert self._transport
await self._transport.send(item) await self._transport.send(
payload,
# hide_tb=hide_tb,
)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self._transport assert self._transport
@ -493,7 +515,7 @@ class Channel:
return self._transport.connected() if self._transport else False return self._transport.connected() if self._transport else False
@asynccontextmanager @acm
async def _connect_chan( async def _connect_chan(
host: str, port: int host: str, port: int
) -> typing.AsyncGenerator[Channel, None]: ) -> typing.AsyncGenerator[Channel, None]:

View File

@ -465,7 +465,7 @@ class Portal:
# TODO: if we set this the wrapping `@acm` body will # TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally # still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show? # we can similarly annotate that frame to NOT show?
hide_tb: bool = False, hide_tb: bool = True,
# proxied to RPC # proxied to RPC
**kwargs, **kwargs,

View File

@ -1,423 +1,4 @@
# tractor: structured concurrent "actors". tb = None
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
The fundamental core machinery implementing every "actor" including
the process-local (python-interpreter global) `Actor` state-type
primitive(s), RPC-in-task scheduling, and IPC connectivity and
low-level transport msg handling.
"""
from __future__ import annotations
from contextlib import (
ExitStack,
asynccontextmanager as acm,
)
from collections import defaultdict
from functools import partial
from itertools import chain
import importlib
import importlib.util
import inspect
from pprint import pformat
import signal
import sys
from typing import (
Any,
Callable,
Union,
Coroutine,
TYPE_CHECKING,
)
import uuid
from types import ModuleType
import os
import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio
from trio import (
CancelScope,
)
from trio_typing import (
Nursery,
TaskStatus,
)
from .msg import NamespacePath
from ._ipc import Channel
from ._context import (
mk_context,
Context,
)
from .log import get_logger
from ._exceptions import (
pack_error,
unpack_error,
ModuleNotExposed,
is_multi_cancelled,
ContextCancelled,
TransportClosed,
)
from . import _debug
from ._discovery import get_arbiter
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
if TYPE_CHECKING:
from ._supervise import ActorNursery
log = get_logger('tractor')
_gb_mod: ModuleType|None|False = None
async def maybe_import_gb():
global _gb_mod
if _gb_mod is False:
return
try:
import greenback
_gb_mod = greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.debug(
'`greenback` is not installed.\n'
'No sync debug support!\n'
)
_gb_mod = False
async def _invoke_non_context(
actor: Actor,
cancel_scope: CancelScope,
ctx: Context,
cid: str,
chan: Channel,
func: Callable,
coro: Coroutine,
kwargs: dict[str, Any],
treat_as_gen: bool,
is_rpc: bool,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
):
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
async with aclosing(coro) as agen:
async for item in agen:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.runtime(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with cancel_scope as cs:
ctx._scope = cs
task_status.started(ctx)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
else:
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
failed_resp: bool = False
try:
await chan.send({
'functype': 'asyncfunc',
'cid': cid
})
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
failed_resp = True
if is_rpc:
raise
else:
# TODO: should this be an `.exception()` call?
log.warning(
f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
)
with cancel_scope as cs:
ctx._scope: CancelScope = cs
task_status.started(ctx)
result = await coro
fname: str = func.__name__
log.runtime(
'RPC complete:\n'
f'task: {ctx._task}\n'
f'|_cid={ctx.cid}\n'
f'|_{fname}() -> {pformat(result)}\n'
)
# NOTE: only send result if we know IPC isn't down
if (
not failed_resp
and chan.connected()
):
try:
await chan.send(
{'return': result,
'cid': cid}
)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
)
@acm
async def _errors_relayed_via_ipc(
actor: Actor,
chan: Channel,
ctx: Context,
is_rpc: bool,
hide_tb: bool = False,
debug_kbis: bool = False,
task_status: TaskStatus[
Context | BaseException
] = trio.TASK_STATUS_IGNORED,
) -> None:
__tracebackhide__: bool = hide_tb # TODO: use hide_tb here?
try:
yield # run RPC invoke body
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
except (
Exception,
BaseExceptionGroup,
KeyboardInterrupt,
) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail due to
# an IPC transport error!
if (
is_rpc
and chan.connected()
):
__tracebackhide__: bool = hide_tb
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ?
# if not isinstance(err, trio.ClosedResourceError) and (
# if not is_multi_cancelled(err) and (
entered_debug: bool = False
if (
(
not isinstance(err, ContextCancelled)
or (
isinstance(err, ContextCancelled)
and ctx._cancel_called
# if the root blocks the debugger lock request from a child
# we will get a remote-cancelled condition.
and ctx._enter_debugger_on_cancel
)
)
and
(
not isinstance(err, KeyboardInterrupt)
or (
isinstance(err, KeyboardInterrupt)
and debug_kbis
)
)
):
# await _debug.pause()
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
# => I can't think of a reason that inspecting this
# type of failure will be useful for respawns or
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
log.exception('Actor crashed:\n')
# always ship errors back to caller
err_msg: dict[str, dict] = pack_error(
err,
# tb=tb, # TODO: special tb fmting?
cid=ctx.cid,
)
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
if is_rpc:
try:
await chan.send(err_msg)
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?\n"
f'{ipc_err}'
)
# error is probably from above coro running code *not from
# the target rpc invocation since a scope was never
# allocated around the coroutine await.
if ctx._scope is None:
# we don't ever raise directly here to allow the
# msg-loop-scheduler to continue running for this
# channel.
task_status.started(err)
# always reraise KBIs so they propagate at the sys-process
# level.
if isinstance(err, KeyboardInterrupt):
raise
# RPC task bookeeping
finally:
try:
ctx, func, is_complete = actor._rpc_tasks.pop(
(chan, ctx.cid)
)
is_complete.set()
except KeyError:
if is_rpc:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning(
'RPC task likely errored or cancelled before start?'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
else:
log.cancel(
'Failed to de-alloc internal runtime cancel task?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
finally:
if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set()
async def _invoke(
actor: 'Actor',
cid: str,
chan: Channel,
func: Callable,
kwargs: dict[str, Any],
is_rpc: bool = True,
hide_tb: bool = True,
task_status: TaskStatus[
Union[Context, BaseException]
] = trio.TASK_STATUS_IGNORED,
):
'''
Schedule a `trio` task-as-func and deliver result(s) over
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
'''
__tracebackhide__: bool = hide_tb
treat_as_gen: bool = False
# possibly a traceback (not sure what typing is for this..)
tb = None
cancel_scope = CancelScope() cancel_scope = CancelScope()
# activated cancel scope ref # activated cancel scope ref
@ -712,9 +293,13 @@ def _get_mod_abspath(module):
return os.path.abspath(module.__file__) return os.path.abspath(module.__file__)
async def try_ship_error_to_parent( async def try_ship_error_to_remote(
channel: Channel, channel: Channel,
err: Union[Exception, BaseExceptionGroup], err: Exception|BaseExceptionGroup,
cid: str|None = None,
remote_descr: str = 'parent',
hide_tb: bool = True,
) -> None: ) -> None:
''' '''
@ -723,22 +308,39 @@ async def try_ship_error_to_parent(
local cancellation ignored but logged as critical(ly bad). local cancellation ignored but logged as critical(ly bad).
''' '''
__tracebackhide__: bool = hide_tb
with CancelScope(shield=True): with CancelScope(shield=True):
try: try:
await channel.send( # NOTE: normally only used for internal runtime errors
# NOTE: normally only used for internal runtime errors # so ship to peer actor without a cid.
# so ship to peer actor without a cid. msg: dict = pack_error(
pack_error(err) err,
cid=cid,
# TODO: special tb fmting for ctxc cases?
# tb=tb,
) )
# NOTE: the src actor should always be packed into the
# error.. but how should we verify this?
# actor: Actor = _state.current_actor()
# assert err_msg['src_actor_uid']
# if not err_msg['error'].get('src_actor_uid'):
# import pdbp; pdbp.set_trace()
await channel.send(msg)
# XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma..
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
BrokenPipeError,
): ):
# in SC terms this is one of the worst things that can err_msg: dict = msg['error']['tb_str']
# happen and provides for a 2-general's dilemma..
log.critical( log.critical(
f'Failed to ship error to parent ' 'IPC transport failure -> '
f'{channel.uid}, IPC transport failure!' f'failed to ship error to {remote_descr}!\n\n'
f'X=> {channel.uid}\n\n'
f'{err_msg}\n'
) )
@ -896,7 +498,10 @@ class Actor:
log.runtime(f"{uid} successfully connected back to us") log.runtime(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1] return event, self._peers[uid][-1]
def load_modules(self) -> None: def load_modules(
self,
debug_mode: bool = False,
) -> None:
''' '''
Load allowed RPC modules locally (after fork). Load allowed RPC modules locally (after fork).
@ -928,7 +533,9 @@ class Actor:
except ModuleNotFoundError: except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error # it is expected the corresponding `ModuleNotExposed` error
# will be raised later # will be raised later
log.error(f"Failed to import {modpath} in {self.name}") log.error(
f"Failed to import {modpath} in {self.name}"
)
raise raise
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
@ -1759,7 +1366,7 @@ class Actor:
log.cancel( log.cancel(
'Cancel request for RPC task\n\n' 'Cancel request for RPC task\n\n'
f'<= Actor.cancel_task(): {requesting_uid}\n\n' f'<= Actor._cancel_task(): {requesting_uid}\n\n'
f'=> {ctx._task}\n' f'=> {ctx._task}\n'
f' |_ >> {ctx.repr_rpc}\n' f' |_ >> {ctx.repr_rpc}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n'
@ -2021,11 +1628,6 @@ async def async_main(
if accept_addr_rent is not None: if accept_addr_rent is not None:
accept_addr = accept_addr_rent accept_addr = accept_addr_rent
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
actor.load_modules()
# The "root" nursery ensures the channel with the immediate # The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
@ -2043,7 +1645,25 @@ async def async_main(
actor._service_n = service_nursery actor._service_n = service_nursery
assert actor._service_n assert actor._service_n
# Startup up the channel server with, # load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
actor.load_modules()
# XXX TODO XXX: figuring out debugging of this
# would somemwhat guarantee "self-hosted" runtime
# debugging (since it hits all the ede cases?)
#
# `tractor.pause()` right?
# try:
# actor.load_modules()
# except ModuleNotFoundError as err:
# _debug.pause_from_sync()
# import pdbp; pdbp.set_trace()
# raise
# Startup up the transport(-channel) server with,
# - subactor: the bind address is sent by our parent # - subactor: the bind address is sent by our parent
# over our established channel # over our established channel
# - root actor: the ``accept_addr`` passed to this method # - root actor: the ``accept_addr`` passed to this method
@ -2122,7 +1742,7 @@ async def async_main(
) )
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent( await try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
err, err,
) )
@ -2532,7 +2152,7 @@ async def process_messages(
log.exception("Actor errored:") log.exception("Actor errored:")
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent( await try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
err, err,
) )

View File

@ -215,7 +215,7 @@ async def cancel_on_completion(
async def hard_kill( async def hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 3, terminate_after: int = 1.6,
# NOTE: for mucking with `.pause()`-ing inside the runtime # NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD # whilst also hacking on it XD
@ -281,8 +281,11 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the # zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort. # removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
# TODO: toss in the skynet-logo face as ascii art?
log.critical( log.critical(
'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
'#T-800 deployed to collect zombie B0\n'
f'|\n'
f'|_{proc}\n' f'|_{proc}\n'
) )
proc.kill() proc.kill()

View File

@ -114,13 +114,19 @@ class MsgStream(trio.abc.Channel):
stream=self, stream=self,
) )
async def receive(self): async def receive(
self,
hide_tb: bool = True,
):
''' '''
Receive a single msg from the IPC transport, the next in Receive a single msg from the IPC transport, the next in
sequence sent by the far end task (possibly in order as sequence sent by the far end task (possibly in order as
determined by the underlying protocol). determined by the underlying protocol).
''' '''
__tracebackhide__: bool = hide_tb
# NOTE: `trio.ReceiveChannel` implements # NOTE: `trio.ReceiveChannel` implements
# EOC handling as follows (aka uses it # EOC handling as follows (aka uses it
# to gracefully exit async for loops): # to gracefully exit async for loops):
@ -139,7 +145,7 @@ class MsgStream(trio.abc.Channel):
if self._closed: if self._closed:
raise self._closed raise self._closed
src_err: Exception|None = None src_err: Exception|None = None # orig tb
try: try:
try: try:
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
@ -186,7 +192,7 @@ class MsgStream(trio.abc.Channel):
# TODO: Locally, we want to close this stream gracefully, by # TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically. # terminating any local consumers tasks deterministically.
# One we have broadcast support, we **don't** want to be # Once we have broadcast support, we **don't** want to be
# closing this stream and not flushing a final value to # closing this stream and not flushing a final value to
# remaining (clone) consumers who may not have been # remaining (clone) consumers who may not have been
# scheduled to receive it yet. # scheduled to receive it yet.
@ -237,7 +243,12 @@ class MsgStream(trio.abc.Channel):
raise_ctxc_from_self_call=True, raise_ctxc_from_self_call=True,
) )
raise src_err # propagate # propagate any error but hide low-level frames from
# caller by default.
if hide_tb:
raise type(src_err)(*src_err.args) from src_err
else:
raise src_err
async def aclose(self) -> list[Exception|dict]: async def aclose(self) -> list[Exception|dict]:
''' '''
@ -475,23 +486,39 @@ class MsgStream(trio.abc.Channel):
async def send( async def send(
self, self,
data: Any data: Any,
hide_tb: bool = True,
) -> None: ) -> None:
''' '''
Send a message over this stream to the far end. Send a message over this stream to the far end.
''' '''
if self._ctx._remote_error: __tracebackhide__: bool = hide_tb
raise self._ctx._remote_error # from None
self._ctx.maybe_raise()
if self._closed: if self._closed:
raise self._closed raise self._closed
# raise trio.ClosedResourceError('This stream was already closed')
await self._ctx.chan.send({ try:
'yield': data, await self._ctx.chan.send(
'cid': self._ctx.cid, payload={
}) 'yield': data,
'cid': self._ctx.cid,
},
# hide_tb=hide_tb,
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as trans_err:
if hide_tb:
raise type(trans_err)(
*trans_err.args
) from trans_err
else:
raise
def stream(func: Callable) -> Callable: def stream(func: Callable) -> Callable: