From bd31f47d5f89055bfcf69dbf32b0564949ce21ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 10:05:40 -0400 Subject: [PATCH] Handle kbi in ctx blocks via `BaseException` Fixes prior committed tests by more generally handling `BaseExcepion` in context blocks. Left in the commented concrete list for reference. --- tractor/_portal.py | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 63c59ed..137761e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -177,7 +177,6 @@ class Portal: f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: - # with trio.CancelScope(shield=True): await stream.aclose() except trio.ClosedResourceError: # don't error the stream having already been closed @@ -294,7 +293,6 @@ class Portal: async def open_stream_from( self, async_gen_func: Callable, # typing: ignore - shield: bool = False, **kwargs, ) -> AsyncGenerator[ReceiveMsgStream, None]: @@ -318,11 +316,17 @@ class Portal: # receive only stream assert functype == 'asyncgen' - ctx = Context(self.channel, cid, _portal=self) + ctx = Context( + self.channel, + cid, + # do we need this to be closed implicitly? + # _recv_chan=recv_chan, + _portal=self + ) try: # deliver receive only stream async with ReceiveMsgStream( - ctx, recv_chan, shield=shield + ctx, recv_chan, ) as rchan: self._streams.add(rchan) yield rchan @@ -337,13 +341,16 @@ class Portal: # message right now since there shouldn't be a reason to # stop and restart the stream, right? try: - await ctx.cancel() + with trio.CancelScope(shield=True): + await ctx.cancel() except trio.ClosedResourceError: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. - log.debug(f'Context {ctx} was already closed?') + log.warning(f'Context {ctx} was already closed?') + # XXX: should this always be done? + # await recv_chan.aclose() self._streams.remove(rchan) @asynccontextmanager @@ -408,8 +415,8 @@ class Portal: # pairs with handling in ``Actor._push_result()`` # recv_chan._ctx = ctx - # await trio.lowlevel.checkpoint() + yield ctx, first except ContextCancelled as err: @@ -427,9 +434,14 @@ class Portal: log.debug(f'Context {ctx} cancelled gracefully') except ( - trio.Cancelled, - trio.MultiError, - Exception, + BaseException, + + # more specifically, we need to handle: + # Exception, + # trio.Cancelled, + # trio.MultiError, + # KeyboardInterrupt, + ) as err: _err = err # the context cancels itself on any cancel @@ -440,6 +452,11 @@ class Portal: raise finally: + # in the case where a runtime nursery (due to internal bug) + # or a remote actor transmits an error we want to be + # sure we get the error the underlying feeder mem chan. + # if it's not raised here it *should* be raised from the + # msg loop nursery right? result = await ctx.result() # though it should be impossible for any tasks