Better idea, enable backpressure on opened streams

Keeping it disabled on context open will help with detecting any stream
connection which was never opened on one side of the task pair.  In that
case we can report that there was an overrun **and** a stream wasn't
opened versus if the stream is explicitly configured not to use bp then
we throw the standard overflow.

Use `trio.Nursery._closed` to detect "closure" XD since it seems to be
the most reliable way to determine if a spawn call will trigger
a runtime error.
stricter_context_starting
Tyler Goodlet 2021-12-06 10:57:58 -05:00
parent 4ea5c9b5db
commit b826ec8103
1 changed files with 23 additions and 9 deletions

View File

@ -298,8 +298,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''
# if self._eoc:
# raise trio.ClosedResourceError('This stream is already ded')
# if self._ctx._error:
# raise self._ctx._error
if self._ctx._error:
raise self._ctx._error # from None
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
@ -309,6 +310,10 @@ class Context:
'''
An inter-actor, ``trio`` task communication context.
NB: This class should never be instatiated directly, it is delivered
by either runtime machinery to a remotely started task or by entering
``Portal.open_context()``.
Allows maintaining task or protocol specific state between
2 communicating actor tasks. A unique context is created on the
callee side/end for every request to a remote actor from a portal.
@ -370,20 +375,26 @@ class Context:
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
'''
self._error = unpack_error(msg, self.chan)
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
if self._scope_nursery:
async def raiser():
__tracebackhide__ = True
raise self._error
raise self._error from None
if not self._scope_nursery.cancel_scope.cancel_called:
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
if not self._scope_nursery._closed: # type: ignore
self._scope_nursery.start_soon(raiser)
async def cancel(self) -> None:
'''Cancel this inter-actor-task context.
'''
Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context,
Timeout quickly in an attempt to sidestep 2-generals...
@ -444,7 +455,7 @@ class Context:
async def open_stream(
self,
backpressure: bool = False,
backpressure: Optional[bool] = True,
msg_buffer_size: Optional[int] = None,
) -> AsyncGenerator[MsgStream, None]:
@ -555,7 +566,7 @@ class Context:
try:
self._result = msg['return']
break
except KeyError:
except KeyError as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
@ -569,7 +580,10 @@ class Context:
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self._portal.channel)
raise unpack_error(
msg, self._portal.channel
) from msgerr
return self._result