forked from goodboy/tractor
				
			Always shield cancel the caller on cancel-causing-errors, add teardown logging
							parent
							
								
									17fca76865
								
							
						
					
					
						commit
						6f22ee8621
					
				|  | @ -295,6 +295,7 @@ class Portal: | |||
|         self, | ||||
|         async_gen_func: Callable,  # typing: ignore | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[ReceiveMsgStream, None]: | ||||
| 
 | ||||
|         if not inspect.isasyncgenfunction(async_gen_func): | ||||
|  | @ -347,7 +348,6 @@ class Portal: | |||
| 
 | ||||
|         self, | ||||
|         func: Callable, | ||||
|         cancel_on_exit: bool = False, | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> AsyncGenerator[Tuple[Context, Any], None]: | ||||
|  | @ -359,6 +359,7 @@ class Portal: | |||
|         and synchronized final result collection. See ``tractor.Context``. | ||||
| 
 | ||||
|         ''' | ||||
| 
 | ||||
|         # conduct target func method structural checks | ||||
|         if not inspect.iscoroutinefunction(func) and ( | ||||
|             getattr(func, '_tractor_contex_function', False) | ||||
|  | @ -390,6 +391,7 @@ class Portal: | |||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         _err = None | ||||
|         # deliver context instance and .started() msg value in open | ||||
|         # tuple. | ||||
|         try: | ||||
|  | @ -403,26 +405,20 @@ class Portal: | |||
|                 ) | ||||
|                 recv_chan._ctx = ctx | ||||
| 
 | ||||
|                 # await trio.lowlevel.checkpoint() | ||||
|                 yield ctx, first | ||||
| 
 | ||||
|             log.info(f'Context for {func.__name__} completed') | ||||
|                 # if not ctx._cancel_called: | ||||
|                 #     await ctx.result() | ||||
| 
 | ||||
|             if cancel_on_exit: | ||||
|                 await ctx.cancel() | ||||
|             # await recv_chan.aclose() | ||||
| 
 | ||||
|             else: | ||||
|                 if not ctx._cancel_called: | ||||
|                     await ctx.result() | ||||
| 
 | ||||
|             await recv_chan.aclose() | ||||
| 
 | ||||
|             # except TypeError: | ||||
|             #     # if fn_name == '_emsd_main': | ||||
|             #     import tractor | ||||
|             #     await tractor.breakpoint() | ||||
| 
 | ||||
|         except ContextCancelled: | ||||
|         except ContextCancelled as err: | ||||
|             _err = err | ||||
|             if not ctx._cancel_called: | ||||
|                 # context was cancelled at the far end but was | ||||
|                 # not part of this end requesting that cancel | ||||
|                 # so raise for the local task to respond and handle. | ||||
|                 raise | ||||
| 
 | ||||
|             # if the context was cancelled by client code | ||||
|  | @ -431,16 +427,43 @@ class Portal: | |||
|             else: | ||||
|                 log.debug(f'Context {ctx} cancelled gracefully') | ||||
| 
 | ||||
|         except trio.Cancelled: | ||||
|             # the context cancels itself on any deviation | ||||
|             await ctx.cancel() | ||||
|         except ( | ||||
|             trio.Cancelled, | ||||
|             trio.MultiError, | ||||
|             Exception, | ||||
|         ) as err: | ||||
|             _err = err | ||||
|             # the context cancels itself on any cancel | ||||
|             # causing error. | ||||
|             log.error(f'Context {ctx} sending cancel to far end') | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 await ctx.cancel() | ||||
|             raise | ||||
| 
 | ||||
|         # finally: | ||||
|         #     log.info(f'Context for {func.__name__} completed') | ||||
|         finally: | ||||
|             result = await ctx.result() | ||||
| 
 | ||||
|         # finally: | ||||
|         #     if recv_chan is not None: | ||||
|             # though it should be impossible for any tasks | ||||
|             # operating *in* this scope to have survived | ||||
|             # we tear down the runtime feeder chan last | ||||
|             # to avoid premature stream clobbers. | ||||
|             if recv_chan is not None: | ||||
|                 await recv_chan.aclose() | ||||
| 
 | ||||
|             if _err: | ||||
|                 if ctx._cancel_called: | ||||
|                     log.warning( | ||||
|                         f'Context {fn_name} cancelled by caller with\n{_err}' | ||||
|                     ) | ||||
|                 elif _err is not None: | ||||
|                     log.warning( | ||||
|                         f'Context {fn_name} cancelled by callee with\n{_err}' | ||||
|                     ) | ||||
|             else: | ||||
|                 log.info( | ||||
|                     f'Context {fn_name} returned ' | ||||
|                     f'value from callee `{self._result}`' | ||||
|                 ) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
|  | @ -465,10 +488,12 @@ class LocalPortal: | |||
| 
 | ||||
| @asynccontextmanager | ||||
| async def open_portal( | ||||
| 
 | ||||
|     channel: Channel, | ||||
|     nursery: Optional[trio.Nursery] = None, | ||||
|     start_msg_loop: bool = True, | ||||
|     shield: bool = False, | ||||
| 
 | ||||
| ) -> AsyncGenerator[Portal, None]: | ||||
|     """Open a ``Portal`` through the provided ``channel``. | ||||
| 
 | ||||
|  | @ -508,6 +533,7 @@ async def open_portal( | |||
|             if was_connected: | ||||
|                 # gracefully signal remote channel-msg loop | ||||
|                 await channel.send(None) | ||||
|                 # await channel.aclose() | ||||
| 
 | ||||
|             # cancel background msg loop task | ||||
|             if msg_loop_cs: | ||||
|  |  | |||
|  | @ -67,8 +67,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|             raise trio.EndOfChannel | ||||
| 
 | ||||
|         try: | ||||
|             # if self._ctx.chan.uid[0] == 'brokerd.ib': | ||||
|             #     breakpoint() | ||||
| 
 | ||||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
|  | @ -173,7 +171,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed:  # or self._eoc: | ||||
|         if rx_chan._closed: # or self._eoc: | ||||
|             log.warning(f"{self} is already closed") | ||||
| 
 | ||||
|             # this stream has already been closed so silently succeed as | ||||
|  | @ -338,6 +336,12 @@ class Context: | |||
|         msg: Dict[str, Any], | ||||
| 
 | ||||
|     ) -> None: | ||||
|         '''Unpack and raise a msg error into the local scope | ||||
|         nursery for this context. | ||||
| 
 | ||||
|         Acts as a form of "relay" for a remote error raised | ||||
|         in the corresponding remote callee task. | ||||
|         ''' | ||||
|         async def raiser(): | ||||
|             raise unpack_error(msg, self.chan) | ||||
| 
 | ||||
|  | @ -350,11 +354,13 @@ class Context: | |||
|         Timeout quickly in an attempt to sidestep 2-generals... | ||||
| 
 | ||||
|         ''' | ||||
|         log.warning(f'Cancelling caller side of context {self}') | ||||
|         side = 'caller' if self._portal else 'callee' | ||||
| 
 | ||||
|         log.warning(f'Cancelling {side} side of context to {self.chan}') | ||||
| 
 | ||||
|         self._cancel_called = True | ||||
| 
 | ||||
|         if self._portal:  # caller side: | ||||
|         if side == 'caller': | ||||
|             if not self._portal: | ||||
|                 raise RuntimeError( | ||||
|                     "No portal found, this is likely a callee side context" | ||||
|  | @ -382,8 +388,8 @@ class Context: | |||
|                         "May have failed to cancel remote task " | ||||
|                         f"{cid} for {self._portal.channel.uid}") | ||||
|         else: | ||||
|             # ensure callee side | ||||
|             assert self._scope_nursery | ||||
|             # callee side remote task | ||||
| 
 | ||||
|             # TODO: should we have an explicit cancel message | ||||
|             # or is relaying the local `trio.Cancelled` as an | ||||
|             # {'error': trio.Cancelled, cid: "blah"} enough? | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue