Add internal msg stream backpressure controls

In preparation for supporting both backpressure detection (through an
optional error) as well as control over the msg channel buffer size, add
internal configuration flags for both to contexts. Also adjust
`Context._err_on_from_remote_msg()` -> `._maybe..` such that it can be
called and will only raise if a scope nursery has been set. Add
a `Context._error` for stashing the remote task's error that may be
delivered in an `'error'` message.
stricter_context_starting
Tyler Goodlet 2021-12-05 19:19:53 -05:00
parent 6751349987
commit 92b540d518
1 changed files with 24 additions and 9 deletions

View File

@ -190,6 +190,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# was it shouldn't matter since it's unlikely a user # was it shouldn't matter since it's unlikely a user
# will try to re-use a stream after attemping to close # will try to re-use a stream after attemping to close
# it). # it).
with trio.CancelScope(shield=True):
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except (
@ -283,11 +284,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
class MsgStream(ReceiveMsgStream, trio.abc.Channel): class MsgStream(ReceiveMsgStream, trio.abc.Channel):
""" '''
Bidirectional message stream for use within an inter-actor actor Bidirectional message stream for use within an inter-actor actor
``Context```. ``Context```.
""" '''
async def send( async def send(
self, self,
data: Any data: Any
@ -297,6 +298,8 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
''' '''
# if self._eoc: # if self._eoc:
# raise trio.ClosedResourceError('This stream is already ded') # raise trio.ClosedResourceError('This stream is already ded')
# if self._ctx._error:
# raise self._ctx._error
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
@ -330,6 +333,7 @@ class Context:
# only set on the caller side # only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa _portal: Optional['Portal'] = None # type: ignore # noqa
_result: Optional[Any] = False _result: Optional[Any] = False
_error: Optional[BaseException] = None
# status flags # status flags
_cancel_called: bool = False _cancel_called: bool = False
@ -340,6 +344,8 @@ class Context:
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
warnings.warn( warnings.warn(
@ -353,22 +359,27 @@ class Context:
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) await self.chan.send({'stop': True, 'cid': self.cid})
def _error_from_remote_msg( def _maybe_error_from_remote_msg(
self, self,
msg: Dict[str, Any], msg: Dict[str, Any],
) -> None: ) -> None:
'''Unpack and raise a msg error into the local scope '''
Unpack and raise a msg error into the local scope
nursery for this context. nursery for this context.
Acts as a form of "relay" for a remote error raised Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task. in the corresponding remote callee task.
''' '''
assert self._scope_nursery self._error = unpack_error(msg, self.chan)
if self._scope_nursery:
async def raiser(): async def raiser():
raise unpack_error(msg, self.chan) __tracebackhide__ = True
raise self._error
if not self._scope_nursery.cancel_scope.cancel_called:
self._scope_nursery.start_soon(raiser) self._scope_nursery.start_soon(raiser)
async def cancel(self) -> None: async def cancel(self) -> None:
@ -433,6 +444,8 @@ class Context:
async def open_stream( async def open_stream(
self, self,
backpressure: bool = False,
msg_buffer_size: Optional[int] = None,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
''' '''
@ -482,7 +495,9 @@ class Context:
ctx = actor.get_context( ctx = actor.get_context(
self.chan, self.chan,
self.cid, self.cid,
msg_buffer_size=msg_buffer_size,
) )
ctx._backpressure = backpressure
assert ctx is self assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has # XXX: If the underlying channel feeder receive mem chan has