forked from goodboy/tractor
1
0
Fork 0

Handle kbi in ctx blocks via `BaseException`

Fixes prior committed tests by more generally handling `BaseExcepion` in
context blocks. Left in the commented concrete list for reference.
fix_kbi_in_ctx_block
Tyler Goodlet 2021-10-04 10:05:40 -04:00
parent 8d79d83ac2
commit bd31f47d5f
1 changed files with 27 additions and 10 deletions

View File

@ -177,7 +177,6 @@ class Portal:
f"Cancelling all streams with {self.channel.uid}") f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
# with trio.CancelScope(shield=True):
await stream.aclose() await stream.aclose()
except trio.ClosedResourceError: except trio.ClosedResourceError:
# don't error the stream having already been closed # don't error the stream having already been closed
@ -294,7 +293,6 @@ class Portal:
async def open_stream_from( async def open_stream_from(
self, self,
async_gen_func: Callable, # typing: ignore async_gen_func: Callable, # typing: ignore
shield: bool = False,
**kwargs, **kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]: ) -> AsyncGenerator[ReceiveMsgStream, None]:
@ -318,11 +316,17 @@ class Portal:
# receive only stream # receive only stream
assert functype == 'asyncgen' assert functype == 'asyncgen'
ctx = Context(self.channel, cid, _portal=self) ctx = Context(
self.channel,
cid,
# do we need this to be closed implicitly?
# _recv_chan=recv_chan,
_portal=self
)
try: try:
# deliver receive only stream # deliver receive only stream
async with ReceiveMsgStream( async with ReceiveMsgStream(
ctx, recv_chan, shield=shield ctx, recv_chan,
) as rchan: ) as rchan:
self._streams.add(rchan) self._streams.add(rchan)
yield rchan yield rchan
@ -337,13 +341,16 @@ class Portal:
# message right now since there shouldn't be a reason to # message right now since there shouldn't be a reason to
# stop and restart the stream, right? # stop and restart the stream, right?
try: try:
with trio.CancelScope(shield=True):
await ctx.cancel() await ctx.cancel()
except trio.ClosedResourceError: except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the # if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed. # underlying transport-channel may already be closed.
log.debug(f'Context {ctx} was already closed?') log.warning(f'Context {ctx} was already closed?')
# XXX: should this always be done?
# await recv_chan.aclose()
self._streams.remove(rchan) self._streams.remove(rchan)
@asynccontextmanager @asynccontextmanager
@ -408,8 +415,8 @@ class Portal:
# pairs with handling in ``Actor._push_result()`` # pairs with handling in ``Actor._push_result()``
# recv_chan._ctx = ctx # recv_chan._ctx = ctx
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield ctx, first yield ctx, first
except ContextCancelled as err: except ContextCancelled as err:
@ -427,9 +434,14 @@ class Portal:
log.debug(f'Context {ctx} cancelled gracefully') log.debug(f'Context {ctx} cancelled gracefully')
except ( except (
trio.Cancelled, BaseException,
trio.MultiError,
Exception, # more specifically, we need to handle:
# Exception,
# trio.Cancelled,
# trio.MultiError,
# KeyboardInterrupt,
) as err: ) as err:
_err = err _err = err
# the context cancels itself on any cancel # the context cancels itself on any cancel
@ -440,6 +452,11 @@ class Portal:
raise raise
finally: finally:
# in the case where a runtime nursery (due to internal bug)
# or a remote actor transmits an error we want to be
# sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the
# msg loop nursery right?
result = await ctx.result() result = await ctx.result()
# though it should be impossible for any tasks # though it should be impossible for any tasks