From d3e508b7f73ccd92529a2b7a01ee6bea9167d5b5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Jun 2021 00:18:28 -0400 Subject: [PATCH] Always shield cancel the caller on cancel-causing-errors, add teardown logging --- tractor/_portal.py | 72 +++++++++++++++++++++++++++++-------------- tractor/_streaming.py | 20 +++++++----- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index ed5892d..4471c8f 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -295,6 +295,7 @@ class Portal: self, async_gen_func: Callable, # typing: ignore **kwargs, + ) -> AsyncGenerator[ReceiveMsgStream, None]: if not inspect.isasyncgenfunction(async_gen_func): @@ -347,7 +348,6 @@ class Portal: self, func: Callable, - cancel_on_exit: bool = False, **kwargs, ) -> AsyncGenerator[Tuple[Context, Any], None]: @@ -359,6 +359,7 @@ class Portal: and synchronized final result collection. See ``tractor.Context``. ''' + # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -390,6 +391,7 @@ class Portal: else: raise + _err = None # deliver context instance and .started() msg value in open # tuple. try: @@ -403,26 +405,20 @@ class Portal: ) recv_chan._ctx = ctx + # await trio.lowlevel.checkpoint() yield ctx, first - log.info(f'Context for {func.__name__} completed') + # if not ctx._cancel_called: + # await ctx.result() - if cancel_on_exit: - await ctx.cancel() + # await recv_chan.aclose() - else: - if not ctx._cancel_called: - await ctx.result() - - await recv_chan.aclose() - - # except TypeError: - # # if fn_name == '_emsd_main': - # import tractor - # await tractor.breakpoint() - - except ContextCancelled: + except ContextCancelled as err: + _err = err if not ctx._cancel_called: + # context was cancelled at the far end but was + # not part of this end requesting that cancel + # so raise for the local task to respond and handle. raise # if the context was cancelled by client code @@ -431,16 +427,43 @@ class Portal: else: log.debug(f'Context {ctx} cancelled gracefully') - except trio.Cancelled: - # the context cancels itself on any deviation - await ctx.cancel() + except ( + trio.Cancelled, + trio.MultiError, + Exception, + ) as err: + _err = err + # the context cancels itself on any cancel + # causing error. + log.error(f'Context {ctx} sending cancel to far end') + with trio.CancelScope(shield=True): + await ctx.cancel() raise - # finally: - # log.info(f'Context for {func.__name__} completed') + finally: + result = await ctx.result() - # finally: - # if recv_chan is not None: + # though it should be impossible for any tasks + # operating *in* this scope to have survived + # we tear down the runtime feeder chan last + # to avoid premature stream clobbers. + if recv_chan is not None: + await recv_chan.aclose() + + if _err: + if ctx._cancel_called: + log.warning( + f'Context {fn_name} cancelled by caller with\n{_err}' + ) + elif _err is not None: + log.warning( + f'Context {fn_name} cancelled by callee with\n{_err}' + ) + else: + log.info( + f'Context {fn_name} returned ' + f'value from callee `{self._result}`' + ) @dataclass @@ -465,10 +488,12 @@ class LocalPortal: @asynccontextmanager async def open_portal( + channel: Channel, nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, shield: bool = False, + ) -> AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -508,6 +533,7 @@ async def open_portal( if was_connected: # gracefully signal remote channel-msg loop await channel.send(None) + # await channel.aclose() # cancel background msg loop task if msg_loop_cs: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 976180f..d69bd44 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -67,8 +67,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise trio.EndOfChannel try: - # if self._ctx.chan.uid[0] == 'brokerd.ib': - # breakpoint() msg = await self._rx_chan.receive() return msg['yield'] @@ -173,7 +171,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: # or self._eoc: + if rx_chan._closed: # or self._eoc: log.warning(f"{self} is already closed") # this stream has already been closed so silently succeed as @@ -338,6 +336,12 @@ class Context: msg: Dict[str, Any], ) -> None: + '''Unpack and raise a msg error into the local scope + nursery for this context. + + Acts as a form of "relay" for a remote error raised + in the corresponding remote callee task. + ''' async def raiser(): raise unpack_error(msg, self.chan) @@ -350,11 +354,13 @@ class Context: Timeout quickly in an attempt to sidestep 2-generals... ''' - log.warning(f'Cancelling caller side of context {self}') + side = 'caller' if self._portal else 'callee' + + log.warning(f'Cancelling {side} side of context to {self.chan}') self._cancel_called = True - if self._portal: # caller side: + if side == 'caller': if not self._portal: raise RuntimeError( "No portal found, this is likely a callee side context" @@ -382,8 +388,8 @@ class Context: "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") else: - # ensure callee side - assert self._scope_nursery + # callee side remote task + # TODO: should we have an explicit cancel message # or is relaying the local `trio.Cancelled` as an # {'error': trio.Cancelled, cid: "blah"} enough?